From 1b25ccf626a06928193bfca4ae3dfa9f4d779f2d Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Wed, 20 May 2020 17:23:41 +0300 Subject: [PATCH] Ensure SwitchOnFirst serializes requests per rule 2.7 (#2161) SwitchOnFirst is now based on DeferredSubscription, which was previously improved to ensure rule 2.7 compliance. See: #2164 Signed-off-by: Oleh Dokuka --- .../core/publisher/FluxSwitchOnFirst.java | 157 +----------------- .../core/publisher/FluxSwitchOnFirstTest.java | 38 +++++ 2 files changed, 44 insertions(+), 151 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchOnFirst.java b/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchOnFirst.java index 09be4acc8e..4cc87dc2f9 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchOnFirst.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchOnFirst.java @@ -37,9 +37,6 @@ final class FluxSwitchOnFirst extends InternalFluxOperator { - static final int STATE_CANCELLED = -2; - static final int STATE_SUBSCRIBED = -1; - final BiFunction, Flux, Publisher> transformer; final boolean cancelSourceOnComplete; @@ -435,19 +432,13 @@ boolean tryOnNext(CoreSubscriber actual, T t) { } } - static final class SwitchOnFirstControlSubscriber implements InnerOperator, ControlSubscriber { + static final class SwitchOnFirstControlSubscriber extends Operators.DeferredSubscription implements InnerOperator, ControlSubscriber { final AbstractSwitchOnFirstMain parent; final CoreSubscriber delegate; final boolean cancelSourceOnComplete; - volatile long requested; - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED = - AtomicLongFieldUpdater.newUpdater(SwitchOnFirstControlSubscriber.class, "requested"); - - Subscription s; - SwitchOnFirstControlSubscriber( AbstractSwitchOnFirstMain parent, CoreSubscriber delegate, @@ -464,15 +455,7 @@ public void sendSubscription() { @Override public void onSubscribe(Subscription s) { - final long state = this.requested; - if (this.s == null && state != STATE_CANCELLED) { - this.s = s; - - this.tryRequest(); - } - else { - s.cancel(); - } + set(s); } @Override @@ -514,61 +497,6 @@ public void onComplete() { this.delegate.onComplete(); } - @Override - public void request(long n) { - long r = this.requested; // volatile read beforehand - - if (r > STATE_SUBSCRIBED) { // works only in case onSubscribe has not happened - long u; - for (;;) { // normal CAS loop with overflow protection - if (r == Long.MAX_VALUE) { // if r == Long.MAX_VALUE then we dont care and we can loose this request just in case of racing - return; - } - u = Operators.addCap(r, n); - if (REQUESTED.compareAndSet(this, r, u)) { // Means increment happened before onSubscribe - return; - } - else { // Means increment happened after onSubscribe - r = this.requested; // update new state to see what exactly happened (onSubscribe | cancel | requestN) - - if (r < 0) { // check state (expect -1 | -2 to exit, otherwise repeat) - break; - } - } - } - } - - if (r == STATE_CANCELLED) { // if canceled, just exit - return; - } - - this.s.request(n); // if onSubscribe -> subscription exists (and we sure of that becuase volatile read after volatile write) so we can execute requestN on the subscription - } - - void tryRequest() { - final Subscription s = this.s; - - long r; - - for (;;) { - r = this.requested; - - // prevents cancelled state replacement - if (r == STATE_CANCELLED) { - s.cancel(); - return; - } - - if (REQUESTED.compareAndSet(this, r, STATE_SUBSCRIBED)) { - break; - } - } - - if (r > 0) { // if there is something, - s.request(r); // then we do a request on the given subscription - } - } - @Override public void cancel() { final long state = REQUESTED.getAndSet(this, STATE_CANCELLED); @@ -577,11 +505,7 @@ public void cancel() { } if (state == STATE_SUBSCRIBED) { - try { - this.s.cancel(); - } catch (Throwable t) { - t.printStackTrace(); - } + this.s.cancel(); } this.parent.cancel(); @@ -596,20 +520,13 @@ public Object scanUnsafe(Attr key) { } } - static final class SwitchOnFirstConditionalControlSubscriber implements InnerOperator, ControlSubscriber, + static final class SwitchOnFirstConditionalControlSubscriber extends Operators.DeferredSubscription implements InnerOperator, ControlSubscriber, Fuseable.ConditionalSubscriber { final AbstractSwitchOnFirstMain parent; final Fuseable.ConditionalSubscriber delegate; final boolean terminateUpstreamOnComplete; - volatile long requested; - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED = - AtomicLongFieldUpdater.newUpdater(SwitchOnFirstConditionalControlSubscriber.class, "requested"); - - Subscription s; - SwitchOnFirstConditionalControlSubscriber( AbstractSwitchOnFirstMain parent, Fuseable.ConditionalSubscriber delegate, @@ -626,15 +543,7 @@ public void sendSubscription() { @Override public void onSubscribe(Subscription s) { - final long state = this.requested; - if (this.s == null && state != STATE_CANCELLED) { - this.s = s; - - this.tryRequest(); - } - else { - s.cancel(); - } + set(s); } @Override @@ -681,60 +590,6 @@ public void onComplete() { this.delegate.onComplete(); } - @Override - public void request(long n) { - long r = this.requested; // volatile read beforehand - - if (r > STATE_SUBSCRIBED) { // works only in case onSubscribe has not happened - long u; - for (;;) { // normal CAS loop with overflow protection - if (r == Long.MAX_VALUE) { // if r == Long.MAX_VALUE then we dont care and we can loose this request just in case of racing - return; - } - u = Operators.addCap(r, n); - if (REQUESTED.compareAndSet(this, r, u)) { // Means increment happened before onSubscribe - return; - } - else { // Means increment happened after onSubscribe - r = this.requested; // update new state to see what exactly happened (onSubscribe | cancel | requestN) - - if (r < 0) { // check state (expect -1 | -2 to exit, otherwise repeat) - break; - } - } - } - } - - if (r == STATE_CANCELLED) { // if canceled, just exit - return; - } - - this.s.request(n); // if onSubscribe -> subscription exists (and we sure of that becuase volatile read after volatile write) so we can execute requestN on the subscription - } - - void tryRequest() { - final Subscription s = this.s; - - long r; - - for (;;) { - r = this.requested; - - if (r == STATE_CANCELLED) { - s.cancel(); - return; - } - - if (REQUESTED.compareAndSet(this, r, STATE_SUBSCRIBED)) { - break; - } - } - - if (r > 0) { // if there is something, - s.request(r); // then we do a request on the given subscription - } - } - @Override public void cancel() { final long state = REQUESTED.getAndSet(this, STATE_CANCELLED); diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxSwitchOnFirstTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxSwitchOnFirstTest.java index c10678be00..86d0a77188 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxSwitchOnFirstTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxSwitchOnFirstTest.java @@ -1627,6 +1627,44 @@ else if (longArgumentCaptor.getAllValues().size() == 2) { } } + @SuppressWarnings("rawtypes") + @Test + public void unitRequestsAreSerialTest() { + @SuppressWarnings("unchecked") + BiFunction[] factories = new BiFunction[] { + (parent, assertSubscriber) -> new FluxSwitchOnFirst.SwitchOnFirstControlSubscriber((FluxSwitchOnFirst.AbstractSwitchOnFirstMain) parent, (CoreSubscriber) assertSubscriber, true), + (parent, assertSubscriber) -> new FluxSwitchOnFirst.SwitchOnFirstConditionalControlSubscriber((FluxSwitchOnFirst.AbstractSwitchOnFirstMain) parent, (Fuseable.ConditionalSubscriber) assertSubscriber, true) + }; + for (BiFunction factory : factories) { + for (int i = 0; i < 100000; i++) { + long[] valueHolder = new long[] { 0 }; + FluxSwitchOnFirst.AbstractSwitchOnFirstMain mockParent = Mockito.mock(FluxSwitchOnFirst.AbstractSwitchOnFirstMain.class); + Mockito.doNothing().when(mockParent).request(Mockito.anyLong()); + Mockito.doNothing().when(mockParent).cancel(); + Subscription mockSubscription = Mockito.mock(Subscription.class); + Mockito.doAnswer((a) -> valueHolder[0] += (long) a.getArgument(0)).when(mockSubscription).request(Mockito.anyLong()); + Mockito.doNothing().when(mockSubscription).cancel(); + AssertSubscriber subscriber = AssertSubscriber.create(0); + InnerOperator switchOnFirstControlSubscriber = factory.apply(mockParent, Operators.toConditionalSubscriber(subscriber)); + + switchOnFirstControlSubscriber.request(10); + RaceTestUtils.race(() -> { + switchOnFirstControlSubscriber.request(10); + switchOnFirstControlSubscriber.request(10); + switchOnFirstControlSubscriber.request(10); + switchOnFirstControlSubscriber.request(10); + }, + () -> switchOnFirstControlSubscriber.onSubscribe(mockSubscription), + Schedulers.parallel()); + + switchOnFirstControlSubscriber.request(10); + Assertions.assertThat(valueHolder[0]) + .isEqualTo(60L); + mockSubscription.toString(); + } + } + } + @SuppressWarnings("rawtypes") @Test public void unitCancelRacingTest() {