Skip to content

Commit

Permalink
fix reactor#1142 Dangling thread when calling WorkQueueProcessor.forc…
Browse files Browse the repository at this point in the history
…eShutdown
  • Loading branch information
simonbasle authored May 1, 2018
1 parent 66051ff commit 7bde523
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,13 @@ void halt() {
}

boolean isRunning() {
return running.get() && (processor.terminated == 0 || processor.error == null &&
processor.ringBuffer.getAsLong() > sequence.getAsLong());
return running.get() && (processor.terminated == 0 ||
(processor.terminated != FORCED_SHUTDOWN &&
processor.error == null &&
processor.ringBuffer.getAsLong() > sequence.getAsLong())
);
}


/**
* It is ok to have another thread rerun this method after a halt().
*/
Expand All @@ -474,13 +476,18 @@ public void run() {
if(!running.get()){
return;
}
if(processor.terminated == 1 && processor.ringBuffer.getAsLong() == -1L) {
if (processor.error != null) {
subscriber.onError(processor.error);
if(processor.terminated == SHUTDOWN) {
if (processor.ringBuffer.getAsLong() == -1L) {
if (processor.error != null) {
subscriber.onError(processor.error);
return;
}
subscriber.onComplete();
return;
}
subscriber.onComplete();
return;
}
else if (processor.terminated == FORCED_SHUTDOWN) {
return;
}
}

Expand Down Expand Up @@ -536,6 +543,9 @@ public void run() {

}
catch (InterruptedException | RuntimeException ce) {
if (ce instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (Exceptions.isCancel(ce)){
reschedule(event);
break;
Expand All @@ -548,7 +558,7 @@ public void run() {
if (!running.get()) {
break;
}
if(processor.terminated == 1) {
if(processor.terminated == SHUTDOWN) {
if (processor.error != null) {
processedSequence = true;
subscriber.onError(processor.error);
Expand All @@ -560,6 +570,9 @@ public void run() {
break;
}
}
else if (processor.terminated == FORCED_SHUTDOWN) {
break;
}
//processedSequence = true;
//continue event-loop

Expand Down Expand Up @@ -639,6 +652,7 @@ boolean replay(final boolean unbounded) {
}
catch (InterruptedException e) {
running.set(false);
Thread.currentThread().interrupt();
return true;
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
Expand All @@ -46,6 +47,7 @@
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
Expand Down Expand Up @@ -1528,4 +1530,30 @@ public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
}

@Test
public void testForceShutdownAfterShutdown() throws InterruptedException {
WorkQueueProcessor<String> processor = WorkQueueProcessor.<String>builder()
.name("processor").bufferSize(4)
.waitStrategy(WaitStrategy.phasedOffLiteLock(200, 100, TimeUnit.MILLISECONDS)) //eliminate the waitstrategy diff
.build();
Publisher<String> publisher = Flux.fromArray(new String[] { "1", "2", "3", "4", "5" });
publisher.subscribe(processor);

AssertSubscriber<String> subscriber = AssertSubscriber.create(0);
processor.subscribe(subscriber);

subscriber.request(1);

Thread.sleep(250);

processor.shutdown();

assertFalse(processor.awaitAndShutdown(Duration.ofMillis(400)));

processor.forceShutdown();

assertTrue(processor.awaitAndShutdown(Duration.ofMillis(400)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ public class TopicProcessorVerification extends AbstractProcessorVerification {

@Override
public Processor<Long, Long> createIdentityProcessor(int bufferSize) {
return TopicProcessor.<Long>builder().name("rb-async")
.bufferSize(bufferSize)
.build();
return TopicProcessor.<Long>builder()
.name("topicProcessorVerification")
.bufferSize(bufferSize)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ public class WorkQueueProcessorVerification extends AbstractProcessorVerificatio

@Override
public Processor<Long, Long> createIdentityProcessor(int bufferSize) {
return WorkQueueProcessor.<Long>builder().name("rb-work").bufferSize(bufferSize).build();
return WorkQueueProcessor.<Long>builder()
.name("workQueueProcessorVerification")
.bufferSize(bufferSize)
.build();
}

@Override
public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo()
throws Throwable {
throw new SkipException("WorkQueueProcessor cannot do that given its " +
"distributing nature");
throw new SkipException("WorkQueueProcessor cannot do that given its distributing nature");
}
}

0 comments on commit 7bde523

Please sign in to comment.