-
Notifications
You must be signed in to change notification settings - Fork 181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Amb
operators for Single
and Completable
should respect reactive contract 2.3
#3040
Conversation
Motivation ========== Reactive Streams rule 2.3 mandates, that Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST NOT call any methods on the Subscription or the Publisher. Our current amb operator implementation also performs a cancellation on the Single that just completed, which violates the rule. Modifications ============= This changeset modifies the amb operator in a way that all but the one Single which got a termination signal (through onNext or onError) will get the cancellation propagated. The test suite is enhanced to cover this scenario, both for ambWith as well as the static amb factory methods. Result ====== The amb* operator variants adhere to reactive streams 2.3 rule.
|
||
@Override | ||
public void cancel() { | ||
if (propagateCancel) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@idelpivnitskiy did I understand you correctly that it needs to be both ways, not just the direction it is done here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we make the state around propagateCancel
atomic via AtomicReferenceUpdater? otherwise will this be racy (unless we assume synchronization/sequencing is done externally)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cancellation after terminal Subscriber method is OK from the spec perspective.
https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.6
If a Publisher signals either onError or onComplete on a Subscriber, that Subscriber’s Subscription MUST be considered cancelled.
https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#3.5
Subscription.cancel MUST respect the responsivity of its caller by returning in a timely manner, MUST be idempotent and MUST be thread-safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Scottmitch the way I read the posted rules doesn't mean that it's ok to explicitly call cancel()
after terminal, it only says that subscription must be considered cancelled.
The rule 2.3 (see PR description) clarifies it clearly, that we should not call cancel()
after we got terminal.
@daschl yes, protection as needed both ways, the say Scott described in his first message. I will look around if we can reuse any pre-existing atomic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what is meant by "both ways" but wanted to make sure I understand the question/suggestion.
"idempotent" from 3.5 allows for multiple calls to cancel. 3.7 also talks to this but is superseded by 3.5. We don't always propagate multiple calls to cancel (bcz of idempotent and it maybe challenging with concurrency controls to track propagating multiple calls so propagating at least 1 should be sufficient)
2.3 says that after the Subscriber is terminated via onComplete/onError, that it shouldn't call Subscription.cancel(). However calling cancel()
doesn't need to strictly prevent Subscriber onComplete/onError (see 2.8 and 3.12). We often make a best effort to propagate terminal state even if cancel happens (so downstream can get the Object and do w/e it was originally intending or cleanup bcz it knows what to do best).
https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#2.3
Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST NOT call any methods on the Subscription or the Publisher.
https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#3.7
After the Subscription is cancelled, additional Subscription.cancel() MUST be NOPs.
https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#2.8
A Subscriber MUST be prepared to receive one or more onNext signals after having called Subscription.cancel() if there are still requested elements pending [see 3.12]. Subscription.cancel() does not guarantee to perform the underlying cleaning operations immediately.
https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#3.12
While the Subscription is not cancelled, Subscription.cancel() MUST request the Publisher to eventually stop signaling its Subscriber. The operation is NOT REQUIRED to affect the Subscription immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However calling cancel() doesn't need to strictly prevent Subscriber onComplete/onError (see 2.8 and 3.12).
💯, thanks for clarifying and bringing these points.
So, we should only prevent cancel after terminal but not terminal after cancel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@daschl worth adding a test if we don't have one already to make sure we can still receive complete/error (from first/second) after the result on amb operation is cancelled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarifications! I updated the PR to reuse the volatile from DelayedCancellable
and added a test to make sure we still see a terminal signal after the cancellation.
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AmbSingles.java
Outdated
Show resolved
Hide resolved
servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/SingleAmbTest.java
Outdated
Show resolved
Hide resolved
servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/SingleAmbTest.java
Outdated
Show resolved
Hide resolved
...etalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/SingleAmbWithAsyncTest.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public void cancel() { | ||
if (propagateCancel) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However calling cancel() doesn't need to strictly prevent Subscriber onComplete/onError (see 2.8 and 3.12).
💯, thanks for clarifying and bringing these points.
So, we should only prevent cancel after terminal but not terminal after cancel.
|
||
@Override | ||
public void cancel() { | ||
if (propagateCancel) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@daschl worth adding a test if we don't have one already to make sure we can still receive complete/error (from first/second) after the result on amb operation is cancelled.
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AmbSingles.java
Outdated
Show resolved
Hide resolved
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AmbSingles.java
Outdated
Show resolved
Hide resolved
servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/SingleAmbTest.java
Show resolved
Hide resolved
@@ -90,11 +90,13 @@ public void onSubscribe(final Cancellable cancellable) { | |||
|
|||
@Override | |||
public void onSuccess(@Nullable final T result) { | |||
ignoreCancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 2 cases to consider consider:
- Concurrency between Subscriber and Subscription for one AmbSubscriber object.
- Concurrency between two different AmbSubscriber when
State
invokes cancel.
for (1) the following sequence is possible:
T1 - Subscription.cancel(), DelayedCancellable.GAS currentUpdater reads Cancellable into oldCancellable
T2 - AmbSubscriber.onSuccess(T), DelayedCancellable.ignoreCancel(), state.trySuccess, target.onSuccess
T1 - oldCancellable.cancel()
Because they are on independent threads this could happen even with atomic operations (assuming we want to always let Subscriber methods propagate even after cancel) but worth adding a comment to clarify. Can you confirm if this would be a problem for this issue you are observing?
for (2) the following sequence is possible:
T1 - AmbSubscriber1.onSuccess, ignoreCancel, state.trySuccess, doneUpdater.GAS, CompositeCancellable.cancel()
T2 - AmbSubscriber2.onSuccess
T1 - AmbSubscriber2.cancel() // cancel already invoked, upstream but already delivered onSuccess
T2 - AmbSubscriber2.ignoreCancel() // cancel already invoked, this won't have any effect
Again because each Single can have a different thread for Subscriber and Subscription, and we want to give precedence to the Subscriber, I'm not sure we can prevent this. However worth noting (and add a comment) to confirm this won't cause an issue with your use case.
I suggest clarifying in comments the expectations and constraints:
- Let Subscriber take precedence - If Subscriber terminates after cancel(), we should propagate as we don't want to suppress work that is already complete and downstream may need to clean up.
- Best effort enforcement of 2.3 - Subscriber and Subscription can be on different threads, and because of (1) it is possible cancel may happen "after" Subscriber methods are invoked if concurrency is involved in the following scenarios:
- Concurrency between Subscriber and Subscription for one AmbSubscriber object.
- Concurrency between two different AmbSubscriber when
State
invokes cancel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the detailed description. Imho, both example are legit races that are true for other operators as well. For that type of a race, @bryce-anderson opened #3038.
Rule 2.3 says:
Subscriber.onComplete()
andSubscriber.onError(Throwable t)
MUST NOT call any methods on the Subscription or the Publisher.
So, in this PR we only try to prevent a case when onComplete/onError
invokes cancel()
on the same thread after delivering the terminal. This helps to prevent a cycle, which is a side-effect of simplified logic around how we compose all cancellables together. The other way to fix this is to always compose other cancellables but not current, but it looked too complicated and requires more memory.
+1 for clarifying in comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assumption we currently have in all other places is that if cancel races with a terminal on different threads, we should do our best to propagate terminal down, but because we know that we can only do 1 terminal we should pessimistically assume that some other onError
can win in a chain of operators and the onSuccess
will be discarded. For this reason, we should always propagate a racy cancel
up to the originator to make sure it can clean up the state, if necessary.
Amb
operators for Single
and Completable
should respect reactive contract 2.3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this is fine wrt the intrinsic races: the only goal here is to prevent breaking rule 2.3.
...concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/DelayedCancellable.java
Outdated
Show resolved
Hide resolved
...concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/DelayedCancellable.java
Outdated
Show resolved
Hide resolved
…oncurrent/internal/DelayedCancellable.java Co-authored-by: Bryce Anderson <bl_anderson@apple.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is package private right now I'm fine with just making the API contract more clear in the documentation.
...concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/DelayedCancellable.java
Show resolved
Hide resolved
I'll follow up with the docs in a separate pr. |
Motivation
Reactive Streams rule 2.3 mandates, that
Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST NOT call any methods on the Subscription or the Publisher
.Our current amb operator implementation also performs a cancellation on the Single that just completed, which violates the rule.
Modifications
This changeset modifies the amb operator in a way that all but the one Single which got a termination signal (through
onNext
oronError
) will get the cancellation propagated.The test suite is enhanced to cover this scenario, both for
ambWith
as well as the staticamb
factory methods. Since theCompletable
also converts toSingle
underneath, the functionality is transitively available as well.Result
The
amb*
operator variants adhere to reactive streams 2.3 rule.