Skip to content

Commit

Permalink
rework tck and examples to follow 1.9 rule
Browse files Browse the repository at this point in the history
  • Loading branch information
OlegDokuka committed Jul 24, 2020
1 parent fc173b2 commit 352b397
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import java.util.Iterator;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
* AsyncIterablePublisher is an implementation of Reactive Streams `Publisher`
Expand Down Expand Up @@ -59,7 +59,6 @@ public void subscribe(final Subscriber<? super T> s) {
// These represent the protocol of the `AsyncIterablePublishers` SubscriptionImpls
static interface Signal {};
enum Cancel implements Signal { Instance; };
enum Subscribe implements Signal { Instance; };
enum Send implements Signal { Instance; };
static final class Request implements Signal {
final long n;
Expand Down Expand Up @@ -87,7 +86,7 @@ final class SubscriptionImpl implements Subscription, Runnable {

// We are using this `AtomicBoolean` to make sure that this `Subscription` doesn't run concurrently with itself,
// which would violate rule 1.3 among others (no concurrent notifications).
private final AtomicBoolean on = new AtomicBoolean(false);
private final AtomicInteger wip = new AtomicInteger(0);

// This method will register inbound demand from our `Subscriber` and validate it against rule 3.9 and rule 3.17
private void doRequest(final long n) {
Expand Down Expand Up @@ -205,46 +204,47 @@ private void signal(final Signal signal) {

// This is the main "event loop" if you so will
@Override public final void run() {
if(on.get()) { // establishes a happens-before relationship with the end of the previous run
try {
final Signal s = inboundSignals.poll(); // We take a signal off the queue
if (!cancelled) { // to make sure that we follow rule 1.8, 3.6 and 3.7
int remainingWork = 1;
for (;;) {

// Below we simply unpack the `Signal`s and invoke the corresponding methods
if (s instanceof Request)
doRequest(((Request)s).n);
else if (s == Send.Instance)
doSend();
else if (s == Cancel.Instance)
doCancel();
else if (s == Subscribe.Instance)
doSubscribe();
Signal s;
while ((s = inboundSignals.poll()) != null) {
if (cancelled) { // to make sure that we follow rule 1.8, 3.6 and 3.7
return;
}
} finally {
on.set(false); // establishes a happens-before relationship with the beginning of the next run
if(!inboundSignals.isEmpty()) // If we still have signals to process
tryScheduleToExecute(); // Then we try to schedule ourselves to execute again

// Below we simply unpack the `Signal`s and invoke the corresponding methods
if (s instanceof Request)
doRequest(((Request) s).n);
else if (s == Send.Instance)
doSend();
else if (s == Cancel.Instance)
doCancel();
}

remainingWork = wip.addAndGet(-remainingWork); // establishes a happens-before relationship with the beginning of the next run
if (remainingWork == 0) {
return;
}
}
}

// This method makes sure that this `Subscription` is only running on one Thread at a time,
// this is important to make sure that we follow rule 1.3
private final void tryScheduleToExecute() {
if(on.compareAndSet(false, true)) {
try {
executor.execute(this);
} catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully
if (!cancelled) {
doCancel(); // First of all, this failure is not recoverable, so we need to follow rule 1.4 and 1.6
try {
terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
} finally {
inboundSignals.clear(); // We're not going to need these anymore
// This subscription is cancelled by now, but letting it become schedulable again means
// that we can drain the inboundSignals queue if anything arrives after clearing
on.set(false);
}
if (wip.getAndIncrement() != 0) { // ensure happens-before with already running work
return;
}

try {
executor.execute(this);
} catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully
if (!cancelled) {
doCancel(); // First of all, this failure is not recoverable, so we need to follow rule 1.4 and 1.6
try {
terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
} finally {
inboundSignals.clear(); // We're not going to need these anymore
}
}
}
Expand All @@ -263,7 +263,7 @@ private final void tryScheduleToExecute() {
// method is only intended to be invoked once, and immediately after the constructor has
// finished.
void init() {
signal(Subscribe.Instance);
doSubscribe();
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,11 @@ public void onSubscribe(Subscription s) {
concurrentAccessBarrier.enterSignal(signal);

subs = s;
subs.request(1);

concurrentAccessBarrier.leaveSignal(signal);

//request after leave signal since request may be offloaded so it is going to be false positive racing
subs.request(1);
}

@Override
Expand Down Expand Up @@ -1099,7 +1101,7 @@ public void onNext(T element) {
}
}
};
env.subscribe(pub, sub, env.defaultTimeoutMillis());
env.subscribe(pub, sub);

// eventually triggers `onNext`, which will then trigger up to `callsCounter` times `request(Long.MAX_VALUE - 1)`
// we're pretty sure to overflow from those
Expand Down
14 changes: 3 additions & 11 deletions tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,28 +297,20 @@ public <T> T flopAndFail(String msg) {


public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException {
subscribe(pub, sub, defaultTimeoutMillis);
}

public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException {
pub.subscribe(sub);
sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub));
sub.subscription.expectCompletion(0, String.format("Could not subscribe %s to Publisher %s", sub, pub));
verifyNoAsyncErrorsNoDelay();
}

public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException {
ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(this);
subscribe(pub, sub, defaultTimeoutMillis());
subscribe(pub, sub);
return sub;
}

public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub) throws InterruptedException {
return newManualSubscriber(pub, defaultTimeoutMillis());
}

public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub, long timeoutMillis) throws InterruptedException {
ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(this);
subscribe(pub, sub, timeoutMillis);
subscribe(pub, sub);
return sub;
}

Expand Down

0 comments on commit 352b397

Please sign in to comment.