Skip to content

Commit

Permalink
fix NO-SOURCE in loops test tasks, reduce verbosity of some tests
Browse files Browse the repository at this point in the history
This commit corrects the loops additional test task so that it doesn't
exclude ALL sources and correctly on include *Loop tests.

TestNG task is similarly configured but still excludes all due to a few
tests failing. Fixing these tests will be the target of a follow-up
issue.
  • Loading branch information
simonbasle committed Dec 18, 2017
1 parent 067c9f1 commit 14d607b
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 71 deletions.
20 changes: 15 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,24 @@ project('reactor-core') {
from("${project.buildDir}/docs/kdoc")
}

task loops(type: Test) {
exclude '**/*'
task loops(type: Test, group: 'verification') {
includes.clear()
include '**/*Loop.*'
doFirst {
println "Additional tests from `loops` ($includes)"
}
}

task testNG(type: Test) {
exclude '**/*'
include '**/*Verification.*'
task testNG(type: Test, group: 'verification') {
useTestNG()

//FIXME smaldini the testNG test randomly fail on local, fail on CI
exclude '**/*.*'
includes.clear()
include '**/*Verification.*'
doFirst {
println "Additional tests from `testNG` ($includes)"
}
}

sourceSets.test.resources.srcDirs = ["src/test/resources", "src/test/java"]
Expand All @@ -369,6 +378,7 @@ project('reactor-core') {
archives kdocZip
}

test.dependsOn testStaticInit
jacocoTestReport.dependsOn testNG
check.dependsOn jacocoTestReport
jar.finalizedBy(japicmp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ public void afterTerminateCallbackFatalIsThrownDirectly() {

@Test
public void afterTerminateCallbackErrorAndErrorCallbackError() {
IllegalStateException err = new IllegalStateException("afterTerminate");
IllegalStateException err = new IllegalStateException("expected afterTerminate");
IllegalArgumentException err2 = new IllegalArgumentException("error");

FluxPeekFuseable<String> flux = new FluxPeekFuseable<>(
Expand Down Expand Up @@ -793,7 +793,6 @@ public void should_reduce_to_10_events() {
})
.blockLast();

System.out.println(rs);
assertEquals(10, count.get());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;

import org.junit.Test;
import reactor.core.CoreSubscriber;
Expand Down Expand Up @@ -45,7 +46,7 @@ public void testSubscribeOnValueFusion() {
StepVerifier.create(Flux.range(1, 100)
.flatMap(f -> Flux.just(f)
.subscribeOn(Schedulers.parallel())
.log()
.log("testSubscribeOnValueFusion", Level.FINE)
.map(this::slow)))
.expectFusion(Fuseable.ASYNC, Fuseable.NONE)
.expectNextCount(100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.logging.Level;

import org.assertj.core.api.Assertions;
import org.junit.Assert;
Expand Down Expand Up @@ -646,12 +647,12 @@ public void manualRequestWindowUntilOverRequestingSourceByPrefetch() {

Flux<Integer> source = Flux.range(1, 20)
.doOnRequest(req::addAndGet)
.log()
.log("source", Level.FINE)
.hide();

StepVerifier.create(source.windowUntil(i -> i % 5 == 0, false, prefetch)
.concatMap(w -> w, 1)
.log("downstream"), 0)
.log("downstream", Level.FINE), 0)
.thenRequest(2)
.expectNext(1, 2)
.thenRequest(6)
Expand All @@ -670,12 +671,12 @@ public void manualRequestWindowWhileOverRequestingSourceByPrefetch() {

Flux<Integer> source = Flux.range(1, 20)
.doOnRequest(req::addAndGet)
.log("source")
.log("source", Level.FINE)
.hide();

StepVerifier.create(source.windowWhile(i -> i % 5 != 0, prefetch)
.concatMap(w -> w.log(), 1)
.log("downstream"), 0)
.concatMap(w -> w.log("window", Level.FINE), 1)
.log("downstream", Level.FINE), 0)
.thenRequest(2)
.expectNext(1, 2)
.thenRequest(6)
Expand All @@ -696,13 +697,13 @@ public void windowWhileOneByOneStartingDelimiterReplenishes() {
StepVerifier.create(
source
.doOnRequest(req::addAndGet)
.log("source")
.log("source", Level.FINE)
.windowWhile(s -> !"#".equals(s), 2)
.log("windowWhile")
.log("windowWhile", Level.FINE)
.concatMap(w -> w.collectList()
.log("window")
.log("window", Level.FINE)
, 1)
.log("downstream")
.log("downstream", Level.FINE)
, StepVerifierOptions.create().checkUnderRequesting(false).initialRequest(1))
.expectNextMatches(List::isEmpty)
.thenRequest(1)
Expand All @@ -726,13 +727,13 @@ public void windowWhileUnboundedStartingDelimiterReplenishes() {
StepVerifier.create(
source
.doOnRequest(req::addAndGet)
.log("source")
.log("source", Level.FINE)
.windowWhile(s -> !"#".equals(s), 2)
.log("windowWhile")
.log("windowWhile", Level.FINE)
.concatMap(w -> w.collectList()
.log("window")
.log("window", Level.FINE)
, 1)
.log("downstream")
.log("downstream", Level.FINE)
)
.expectNextMatches(List::isEmpty)
.assertNext(l -> assertThat(l).containsExactly("1A", "1B", "1C"))
Expand All @@ -752,13 +753,13 @@ public void windowUntilUnboundedStartingDelimiterReplenishes() {
StepVerifier.create(
source
.doOnRequest(req::addAndGet)
.log("source")
.log("source", Level.FINE)
.windowUntil(s -> "#".equals(s), false, 2)
.log("windowUntil")
.log("windowUntil", Level.FINE)
.concatMap(w -> w.collectList()
.log("window")
.log("window", Level.FINE)
, 1)
.log("downstream")
.log("downstream", Level.FINE)
)
.assertNext(l -> assertThat(l).containsExactly("#"))
.assertNext(l -> assertThat(l).containsExactly("1A", "1B", "1C", "#"))
Expand Down
48 changes: 24 additions & 24 deletions reactor-core/src/test/java/reactor/core/publisher/HooksTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -805,20 +805,20 @@ public void onComplete() {
}));

StepVerifier.create(Flux.just(1, 2, 3)
.log()
.log())
.log("log", Level.FINE)
.log("log", Level.FINE))
.expectNext(2, 3, 4)
.verifyComplete();

StepVerifier.create(Mono.just(1)
.log()
.log())
.log("log", Level.FINE)
.log("log", Level.FINE))
.expectNext(2)
.verifyComplete();

StepVerifier.create(ParallelFlux.from(Mono.just(1), Mono.just(1))
.log()
.log())
.log("log", Level.FINE)
.log("log", Level.FINE))
.expectNext(2, 2)
.verifyComplete();
}
Expand Down Expand Up @@ -852,40 +852,40 @@ public void onComplete() {

StepVerifier.create(Flux.just(1, 2, 3)
.tag("metric", "test")
.log()
.log())
.log("log", Level.FINE)
.log("log", Level.FINE))
.expectNext(2, 3, 4)
.verifyComplete();

StepVerifier.create(Mono.just(1)
.tag("metric", "test")
.log()
.log())
.log("log", Level.FINE)
.log("log", Level.FINE))
.expectNext(2)
.verifyComplete();

StepVerifier.create(ParallelFlux.from(Mono.just(1), Mono.just(1))
.tag("metric", "test")
.log()
.log())
.log("log", Level.FINE)
.log("log", Level.FINE))
.expectNext(2, 2)
.verifyComplete();

StepVerifier.create(Flux.just(1, 2, 3)
.log()
.log())
.log("log", Level.FINE)
.log("log", Level.FINE))
.expectNext(1, 2, 3)
.verifyComplete();

StepVerifier.create(Mono.just(1)
.log()
.log())
.log("log", Level.FINE)
.log("log", Level.FINE))
.expectNext(1)
.verifyComplete();

StepVerifier.create(ParallelFlux.from(Mono.just(1), Mono.just(1))
.log()
.log())
.log("log", Level.FINE)
.log("log", Level.FINE))
.expectNext(1, 1)
.verifyComplete();
}
Expand Down Expand Up @@ -916,20 +916,20 @@ public void onComplete() {
}));

StepVerifier.create(Flux.just(1, 2, 3)
.log()
.log())
.log("log", Level.FINE)
.log("log", Level.FINE))
.expectNext(4, 5, 6)
.verifyComplete();

StepVerifier.create(Mono.just(1)
.log()
.log())
.log("log", Level.FINE)
.log("log", Level.FINE))
.expectNext(4)
.verifyComplete();

StepVerifier.create(ParallelFlux.from(Mono.just(1), Mono.just(1))
.log()
.log())
.log("log", Level.FINE)
.log("log", Level.FINE))
.expectNext(6, 6)
.verifyComplete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@
import reactor.core.Fuseable;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.util.Logger;
import reactor.util.Loggers;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

public class MonoPeekAfterTest {

private static final Logger LOG = Loggers.getLogger(MonoPeekAfterTest.class);

@Test
public void onSuccessNormal() {
LongAdder invoked = new LongAdder();
Expand Down Expand Up @@ -726,7 +730,7 @@ public void should_reduce_to_10_events() {
.subscribeOn(Schedulers.parallel())
.reduce((l, r) -> l + "_" + r)
.doOnSuccess(s -> {
System.out.println("success " + x + ": " + s);
LOG.debug("success " + x + ": " + s);
count.incrementAndGet();
}))
.blockLast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.function.Supplier;
import java.util.logging.Level;

import org.junit.Test;
import org.reactivestreams.Subscription;
Expand All @@ -45,7 +46,7 @@ public void collect() {
.collect(as, (a, b) -> a.add(b))
.sequential()
.flatMapIterable(v -> v)
.log()
.log("ParallelCollectTest#collect", Level.FINE)
.subscribe(ts);

ts.assertContainValues(new HashSet<>(Arrays.asList(1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import java.util.logging.Level;

import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
Expand Down Expand Up @@ -258,7 +259,7 @@ public void retryErrorPropagatedFromWorkQueueSubscriberCold() throws Exception {
wq.onNext(2);
wq.onNext(3);
wq.onComplete();
StepVerifier.create(wq.log()
StepVerifier.create(wq.log("wq", Level.FINE)
.doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
(s1, sink) -> {
if (errors.decrementAndGet() > 0) {
Expand Down Expand Up @@ -286,7 +287,7 @@ public void retryErrorPropagatedFromWorkQueueSubscriberHot() throws Exception {
StepVerifier.create(wq.doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
(s1, sink) -> {
if (errors.decrementAndGet() > 0) {
sink.error(new RuntimeException());
sink.error(new RuntimeException("expected"));
}
else {
sink.next(s1);
Expand Down Expand Up @@ -616,7 +617,7 @@ public void retryErrorPropagatedFromWorkQueueSubscriberHotPublishOn()
WorkQueueProcessor<Integer> wq = WorkQueueProcessor.<Integer>builder().autoCancel(false).build();
AtomicInteger onNextSignals = new AtomicInteger();

StepVerifier.create(wq.log()
StepVerifier.create(wq.log("wq", Level.FINE)
.publishOn(Schedulers.parallel())
.publish()
.autoConnect()
Expand All @@ -629,7 +630,7 @@ public void retryErrorPropagatedFromWorkQueueSubscriberHotPublishOn()
return s1;
}
})
.log()
.log("afterMap", Level.FINE)
.retry())
.then(() -> {
wq.onNext(1);
Expand Down Expand Up @@ -994,10 +995,6 @@ public void testCustomRequestTaskThreadNameCreate() {
customTaskExecutor.shutdownNow();
processor.forceShutdown();

for (Thread thread : threads) {
System.out.println(thread.getName());
}

Condition<Thread> customRequestTaskThread = new Condition<>(
thread -> expectedName.equals(thread.getName()),
"a thread named \"%s\"", expectedName);
Expand Down Expand Up @@ -1029,10 +1026,6 @@ public void testCustomRequestTaskThreadNameShare() {
customTaskExecutor.shutdownNow();
processor.forceShutdown();

for (Thread thread : threads) {
System.out.println(thread.getName());
}

Condition<Thread> customRequestTaskThread = new Condition<>(
thread -> expectedName.equals(thread.getName()),
"a thread named \"%s\"", expectedName);
Expand Down
Loading

0 comments on commit 14d607b

Please sign in to comment.