Skip to content

Commit

Permalink
Ensure SwitchOnFirst serializes requests per rule 2.7 (reactor#2161)
Browse files Browse the repository at this point in the history
SwitchOnFirst is now based on DeferredSubscription, which was
previously improved to ensure rule 2.7 compliance.

See: reactor#2164
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka authored May 20, 2020
1 parent c543523 commit 1b25ccf
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@
final class FluxSwitchOnFirst<T, R> extends InternalFluxOperator<T, R> {


static final int STATE_CANCELLED = -2;
static final int STATE_SUBSCRIBED = -1;

final BiFunction<Signal<? extends T>, Flux<T>, Publisher<? extends R>> transformer;
final boolean cancelSourceOnComplete;

Expand Down Expand Up @@ -435,19 +432,13 @@ boolean tryOnNext(CoreSubscriber<? super T> actual, T t) {
}
}

static final class SwitchOnFirstControlSubscriber<T> implements InnerOperator<T, T>, ControlSubscriber<T> {
static final class SwitchOnFirstControlSubscriber<T> extends Operators.DeferredSubscription implements InnerOperator<T,
T>, ControlSubscriber<T> {

final AbstractSwitchOnFirstMain<?, T> parent;
final CoreSubscriber<? super T> delegate;
final boolean cancelSourceOnComplete;

volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<SwitchOnFirstControlSubscriber> REQUESTED =
AtomicLongFieldUpdater.newUpdater(SwitchOnFirstControlSubscriber.class, "requested");

Subscription s;

SwitchOnFirstControlSubscriber(
AbstractSwitchOnFirstMain<?, T> parent,
CoreSubscriber<? super T> delegate,
Expand All @@ -464,15 +455,7 @@ public void sendSubscription() {

@Override
public void onSubscribe(Subscription s) {
final long state = this.requested;
if (this.s == null && state != STATE_CANCELLED) {
this.s = s;

this.tryRequest();
}
else {
s.cancel();
}
set(s);
}

@Override
Expand Down Expand Up @@ -514,61 +497,6 @@ public void onComplete() {
this.delegate.onComplete();
}

@Override
public void request(long n) {
long r = this.requested; // volatile read beforehand

if (r > STATE_SUBSCRIBED) { // works only in case onSubscribe has not happened
long u;
for (;;) { // normal CAS loop with overflow protection
if (r == Long.MAX_VALUE) { // if r == Long.MAX_VALUE then we dont care and we can loose this request just in case of racing
return;
}
u = Operators.addCap(r, n);
if (REQUESTED.compareAndSet(this, r, u)) { // Means increment happened before onSubscribe
return;
}
else { // Means increment happened after onSubscribe
r = this.requested; // update new state to see what exactly happened (onSubscribe | cancel | requestN)

if (r < 0) { // check state (expect -1 | -2 to exit, otherwise repeat)
break;
}
}
}
}

if (r == STATE_CANCELLED) { // if canceled, just exit
return;
}

this.s.request(n); // if onSubscribe -> subscription exists (and we sure of that becuase volatile read after volatile write) so we can execute requestN on the subscription
}

void tryRequest() {
final Subscription s = this.s;

long r;

for (;;) {
r = this.requested;

// prevents cancelled state replacement
if (r == STATE_CANCELLED) {
s.cancel();
return;
}

if (REQUESTED.compareAndSet(this, r, STATE_SUBSCRIBED)) {
break;
}
}

if (r > 0) { // if there is something,
s.request(r); // then we do a request on the given subscription
}
}

@Override
public void cancel() {
final long state = REQUESTED.getAndSet(this, STATE_CANCELLED);
Expand All @@ -577,11 +505,7 @@ public void cancel() {
}

if (state == STATE_SUBSCRIBED) {
try {
this.s.cancel();
} catch (Throwable t) {
t.printStackTrace();
}
this.s.cancel();
}

this.parent.cancel();
Expand All @@ -596,20 +520,13 @@ public Object scanUnsafe(Attr key) {
}
}

static final class SwitchOnFirstConditionalControlSubscriber<T> implements InnerOperator<T, T>, ControlSubscriber<T>,
static final class SwitchOnFirstConditionalControlSubscriber<T> extends Operators.DeferredSubscription implements InnerOperator<T, T>, ControlSubscriber<T>,
Fuseable.ConditionalSubscriber<T> {

final AbstractSwitchOnFirstMain<?, T> parent;
final Fuseable.ConditionalSubscriber<? super T> delegate;
final boolean terminateUpstreamOnComplete;

volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<SwitchOnFirstConditionalControlSubscriber> REQUESTED =
AtomicLongFieldUpdater.newUpdater(SwitchOnFirstConditionalControlSubscriber.class, "requested");

Subscription s;

SwitchOnFirstConditionalControlSubscriber(
AbstractSwitchOnFirstMain<?, T> parent,
Fuseable.ConditionalSubscriber<? super T> delegate,
Expand All @@ -626,15 +543,7 @@ public void sendSubscription() {

@Override
public void onSubscribe(Subscription s) {
final long state = this.requested;
if (this.s == null && state != STATE_CANCELLED) {
this.s = s;

this.tryRequest();
}
else {
s.cancel();
}
set(s);
}

@Override
Expand Down Expand Up @@ -681,60 +590,6 @@ public void onComplete() {
this.delegate.onComplete();
}

@Override
public void request(long n) {
long r = this.requested; // volatile read beforehand

if (r > STATE_SUBSCRIBED) { // works only in case onSubscribe has not happened
long u;
for (;;) { // normal CAS loop with overflow protection
if (r == Long.MAX_VALUE) { // if r == Long.MAX_VALUE then we dont care and we can loose this request just in case of racing
return;
}
u = Operators.addCap(r, n);
if (REQUESTED.compareAndSet(this, r, u)) { // Means increment happened before onSubscribe
return;
}
else { // Means increment happened after onSubscribe
r = this.requested; // update new state to see what exactly happened (onSubscribe | cancel | requestN)

if (r < 0) { // check state (expect -1 | -2 to exit, otherwise repeat)
break;
}
}
}
}

if (r == STATE_CANCELLED) { // if canceled, just exit
return;
}

this.s.request(n); // if onSubscribe -> subscription exists (and we sure of that becuase volatile read after volatile write) so we can execute requestN on the subscription
}

void tryRequest() {
final Subscription s = this.s;

long r;

for (;;) {
r = this.requested;

if (r == STATE_CANCELLED) {
s.cancel();
return;
}

if (REQUESTED.compareAndSet(this, r, STATE_SUBSCRIBED)) {
break;
}
}

if (r > 0) { // if there is something,
s.request(r); // then we do a request on the given subscription
}
}

@Override
public void cancel() {
final long state = REQUESTED.getAndSet(this, STATE_CANCELLED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1627,6 +1627,44 @@ else if (longArgumentCaptor.getAllValues().size() == 2) {
}
}

@SuppressWarnings("rawtypes")
@Test
public void unitRequestsAreSerialTest() {
@SuppressWarnings("unchecked")
BiFunction<FluxSwitchOnFirst.AbstractSwitchOnFirstMain, CoreSubscriber, InnerOperator>[] factories = new BiFunction[] {
(parent, assertSubscriber) -> new FluxSwitchOnFirst.SwitchOnFirstControlSubscriber((FluxSwitchOnFirst.AbstractSwitchOnFirstMain) parent, (CoreSubscriber) assertSubscriber, true),
(parent, assertSubscriber) -> new FluxSwitchOnFirst.SwitchOnFirstConditionalControlSubscriber((FluxSwitchOnFirst.AbstractSwitchOnFirstMain) parent, (Fuseable.ConditionalSubscriber) assertSubscriber, true)
};
for (BiFunction<FluxSwitchOnFirst.AbstractSwitchOnFirstMain, CoreSubscriber, InnerOperator> factory : factories) {
for (int i = 0; i < 100000; i++) {
long[] valueHolder = new long[] { 0 };
FluxSwitchOnFirst.AbstractSwitchOnFirstMain mockParent = Mockito.mock(FluxSwitchOnFirst.AbstractSwitchOnFirstMain.class);
Mockito.doNothing().when(mockParent).request(Mockito.anyLong());
Mockito.doNothing().when(mockParent).cancel();
Subscription mockSubscription = Mockito.mock(Subscription.class);
Mockito.doAnswer((a) -> valueHolder[0] += (long) a.getArgument(0)).when(mockSubscription).request(Mockito.anyLong());
Mockito.doNothing().when(mockSubscription).cancel();
AssertSubscriber<Object> subscriber = AssertSubscriber.create(0);
InnerOperator switchOnFirstControlSubscriber = factory.apply(mockParent, Operators.toConditionalSubscriber(subscriber));

switchOnFirstControlSubscriber.request(10);
RaceTestUtils.race(() -> {
switchOnFirstControlSubscriber.request(10);
switchOnFirstControlSubscriber.request(10);
switchOnFirstControlSubscriber.request(10);
switchOnFirstControlSubscriber.request(10);
},
() -> switchOnFirstControlSubscriber.onSubscribe(mockSubscription),
Schedulers.parallel());

switchOnFirstControlSubscriber.request(10);
Assertions.assertThat(valueHolder[0])
.isEqualTo(60L);
mockSubscription.toString();
}
}
}

@SuppressWarnings("rawtypes")
@Test
public void unitCancelRacingTest() {
Expand Down

0 comments on commit 1b25ccf

Please sign in to comment.