Skip to content

Commit

Permalink
Fix reactor#990 eventual premature termination issue with WorkQueuePr…
Browse files Browse the repository at this point in the history
…ocessor

* reactivate (on release) TestNG task, less verbose TCK tests, per-test resources
* Fix premature termination inconsistency on WorkQueueProcessor
  • Loading branch information
smaldini authored Jan 2, 2018
1 parent 6c0bde7 commit 4b2fd19
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 36 deletions.
15 changes: 8 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,14 @@ project('reactor-core') {
}

task testNG(type: Test, group: 'verification') {
useTestNG()

//FIXME smaldini the testNG test randomly fail on local, fail on CI
exclude '**/*.*'
includes.clear()
include '**/*Verification.*'
doFirst {
println "Additional tests from `testNG` ($includes)"
if (!version.endsWith('BUILD-SNAPSHOT')) {
includes.clear()
useTestNG()
include '**/*Verification.*'
doFirst {
println "Additional tests from `testNG` ($includes)"
}
}
}

Expand Down Expand Up @@ -389,6 +389,7 @@ project('reactor-core') {

test.dependsOn testStaticInit
jacocoTestReport.dependsOn testNG

check.dependsOn jacocoTestReport
jar.finalizedBy(japicmp)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.concurrent.WaitStrategy;
import reactor.util.context.Context;

/**
* A base processor used by executor backed processors to take care of their ExecutorService
Expand Down Expand Up @@ -411,9 +412,9 @@ final public boolean isTerminated() {
final public void onComplete() {
if (TERMINATED.compareAndSet(this, 0, SHUTDOWN)) {
upstreamSubscription = null;
doComplete();
executor.shutdown();
readWait.signalAllWhenBlocking();
doComplete();
}
}

Expand All @@ -423,9 +424,12 @@ final public void onError(Throwable t) {
if (TERMINATED.compareAndSet(this, 0, SHUTDOWN)) {
error = t;
upstreamSubscription = null;
doError(t);
executor.shutdown();
readWait.signalAllWhenBlocking();
doError(t);
}
else {
Operators.onErrorDropped(t, Context.empty());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,10 @@ public void run() {
nextSequence = sequence.getAsLong();
Slot<T> event = null;

final boolean unbounded = pendingRequest.getAsLong() == Long.MAX_VALUE;

if (!EventLoopProcessor.waitRequestOrTerminalEvent(pendingRequest, barrier, running, sequence,
waiter)) {
waiter) && replay(unbounded)) {
if(!running.get()){
return;
}
Expand All @@ -482,13 +484,6 @@ public void run() {
}
}

final boolean unbounded = pendingRequest.getAsLong() == Long.MAX_VALUE;

if (replay(unbounded)) {
running.set(false);
return;
}

while (true) {
try {
// if previous sequence was processed - fetch the next sequence and push
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract class AbstractFluxVerification
final int batch = 1024;

AbstractFluxVerification() {
super(new TestEnvironment(true));
super(new TestEnvironment(false));
}

abstract Flux<Integer> transformFlux(Flux<Integer> f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@
*/
public abstract class AbstractProcessorVerification extends org.reactivestreams.tck.IdentityProcessorVerification<Long> {

final ExecutorService executorService = Executors.newCachedThreadPool();
// final ExecutorService executorService = Executors.newCachedThreadPool();

@Override
public ExecutorService publisherExecutorService() {
return executorService;
// return executorService;
return Executors.newCachedThreadPool();
}

AbstractProcessorVerification() {
super(new TestEnvironment(500, envDefaultNoSignalsTimeoutMillis(), true));
super(new TestEnvironment(500, envDefaultNoSignalsTimeoutMillis(), false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package reactor.core.publisher.tck;

import java.util.logging.Level;

import org.reactivestreams.Processor;
import org.testng.SkipException;
import reactor.core.publisher.EmitterProcessor;
Expand All @@ -29,7 +31,7 @@ public class EmitterProcessorVerification extends AbstractProcessorVerification
@Override
public Processor<Long, Long> createIdentityProcessor(int bufferSize) {
FluxProcessor<Long, Long> p = EmitterProcessor.create(bufferSize);
return FluxProcessor.wrap(p, p.log());
return FluxProcessor.wrap(p, p.log("EmitterProcessorVerification", Level.FINE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import java.time.Duration;
import java.util.function.BiFunction;

import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
Expand All @@ -31,13 +31,12 @@
@org.testng.annotations.Test
public class FluxBlackboxProcessorVerification extends AbstractFluxVerification {

static Scheduler sharedGroup;
private Scheduler sharedGroup;

@Override
Flux<Integer> transformFlux(Flux<Integer> f) {

Flux<String> otherStream = Flux.just("test", "test2", "test3");
System.out.println("Providing new downstream");
// System.out.println("Providing new downstream");

Scheduler asyncGroup = Schedulers.newParallel("flux-p-tck", 2);

Expand All @@ -61,17 +60,14 @@ Flux<Integer> transformFlux(Flux<Integer> f) {
.doOnError(Throwable::printStackTrace);
}



@BeforeClass
public static void setupGlobal(){
@BeforeMethod
public void init() {
sharedGroup = Schedulers.newParallel("fluxion-tck", 2);
}

@AfterClass
public static void tearDownGlobal(){
@AfterMethod
public void tearDown(){
sharedGroup.dispose();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package reactor.core.publisher.tck;

import java.util.logging.Level;

import org.reactivestreams.Publisher;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.TestEnvironment;
Expand Down Expand Up @@ -47,7 +49,7 @@ else if(cursor == elements){

.map(data -> data * 10)
.map( data -> data / 10)
.log("log-test");
.log("log-test", Level.FINE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class FluxWithProcessorVerification extends AbstractProcessorVerification
@Override
public Processor<Long, Long> createIdentityProcessor(int bufferSize) {
Flux<String> otherStream = Flux.just("test", "test2", "test3");
System.out.println("Providing new downstream");
// System.out.println("Providing new downstream");
FluxProcessor<Long, Long> p =
WorkQueueProcessor.<Long>builder().name("fluxion-raw-fork").bufferSize(bufferSize).build();

Expand Down

0 comments on commit 4b2fd19

Please sign in to comment.