Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amb operators for Single and Completable should respect reactive contract 2.3 #3040

Merged
merged 3 commits into from
Aug 14, 2024

Conversation

daschl
Copy link
Contributor

@daschl daschl commented Aug 12, 2024

Motivation

Reactive Streams rule 2.3 mandates, that Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST NOT call any methods on the Subscription or the Publisher.

Our current amb operator implementation also performs a cancellation on the Single that just completed, which violates the rule.

Modifications

This changeset modifies the amb operator in a way that all but the one Single which got a termination signal (through onNext or onError) will get the cancellation propagated.

The test suite is enhanced to cover this scenario, both for ambWith as well as the static amb factory methods. Since the Completable also converts to Single underneath, the functionality is transitively available as well.

Result

The amb* operator variants adhere to reactive streams 2.3 rule.

Motivation
==========
Reactive Streams rule 2.3 mandates, that Subscriber.onComplete() and
Subscriber.onError(Throwable t) MUST NOT call any methods on the
Subscription or the Publisher.

Our current amb operator implementation also performs a cancellation
on the Single that just completed, which violates the rule.

Modifications
=============
This changeset modifies the amb operator in a way that all but the
one Single which got a termination signal (through onNext or onError)
will get the cancellation propagated.

The test suite is enhanced to cover this scenario, both for ambWith
as well as the static amb factory methods.

Result
======
The amb* operator variants adhere to reactive streams 2.3 rule.

@Override
public void cancel() {
if (propagateCancel) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@idelpivnitskiy did I understand you correctly that it needs to be both ways, not just the direction it is done here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make the state around propagateCancel atomic via AtomicReferenceUpdater? otherwise will this be racy (unless we assume synchronization/sequencing is done externally)?

Copy link
Member

@Scottmitch Scottmitch Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancellation after terminal Subscriber method is OK from the spec perspective.

https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.6

If a Publisher signals either onError or onComplete on a Subscriber, that Subscriber’s Subscription MUST be considered cancelled.

https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#3.5

Subscription.cancel MUST respect the responsivity of its caller by returning in a timely manner, MUST be idempotent and MUST be thread-safe.

Copy link
Member

@idelpivnitskiy idelpivnitskiy Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Scottmitch the way I read the posted rules doesn't mean that it's ok to explicitly call cancel() after terminal, it only says that subscription must be considered cancelled.

The rule 2.3 (see PR description) clarifies it clearly, that we should not call cancel() after we got terminal.

@daschl yes, protection as needed both ways, the say Scott described in his first message. I will look around if we can reuse any pre-existing atomic.

Copy link
Member

@Scottmitch Scottmitch Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what is meant by "both ways" but wanted to make sure I understand the question/suggestion.

"idempotent" from 3.5 allows for multiple calls to cancel. 3.7 also talks to this but is superseded by 3.5. We don't always propagate multiple calls to cancel (bcz of idempotent and it maybe challenging with concurrency controls to track propagating multiple calls so propagating at least 1 should be sufficient)

2.3 says that after the Subscriber is terminated via onComplete/onError, that it shouldn't call Subscription.cancel(). However calling cancel() doesn't need to strictly prevent Subscriber onComplete/onError (see 2.8 and 3.12). We often make a best effort to propagate terminal state even if cancel happens (so downstream can get the Object and do w/e it was originally intending or cleanup bcz it knows what to do best).

https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#2.3

Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST NOT call any methods on the Subscription or the Publisher.

https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#3.7

After the Subscription is cancelled, additional Subscription.cancel() MUST be NOPs.

https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#2.8

A Subscriber MUST be prepared to receive one or more onNext signals after having called Subscription.cancel() if there are still requested elements pending [see 3.12]. Subscription.cancel() does not guarantee to perform the underlying cleaning operations immediately.

https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#3.12

While the Subscription is not cancelled, Subscription.cancel() MUST request the Publisher to eventually stop signaling its Subscriber. The operation is NOT REQUIRED to affect the Subscription immediately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However calling cancel() doesn't need to strictly prevent Subscriber onComplete/onError (see 2.8 and 3.12).

💯, thanks for clarifying and bringing these points.
So, we should only prevent cancel after terminal but not terminal after cancel.

Copy link
Member

@idelpivnitskiy idelpivnitskiy Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@daschl worth adding a test if we don't have one already to make sure we can still receive complete/error (from first/second) after the result on amb operation is cancelled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarifications! I updated the PR to reuse the volatile from DelayedCancellable and added a test to make sure we still see a terminal signal after the cancellation.


@Override
public void cancel() {
if (propagateCancel) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However calling cancel() doesn't need to strictly prevent Subscriber onComplete/onError (see 2.8 and 3.12).

💯, thanks for clarifying and bringing these points.
So, we should only prevent cancel after terminal but not terminal after cancel.


@Override
public void cancel() {
if (propagateCancel) {
Copy link
Member

@idelpivnitskiy idelpivnitskiy Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@daschl worth adding a test if we don't have one already to make sure we can still receive complete/error (from first/second) after the result on amb operation is cancelled.

@@ -90,11 +90,13 @@ public void onSubscribe(final Cancellable cancellable) {

@Override
public void onSuccess(@Nullable final T result) {
ignoreCancel();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 cases to consider consider:

  1. Concurrency between Subscriber and Subscription for one AmbSubscriber object.
  2. Concurrency between two different AmbSubscriber when State invokes cancel.

for (1) the following sequence is possible:

T1 - Subscription.cancel(), DelayedCancellable.GAS currentUpdater reads Cancellable into oldCancellable
T2 - AmbSubscriber.onSuccess(T), DelayedCancellable.ignoreCancel(), state.trySuccess, target.onSuccess
T1 - oldCancellable.cancel()

Because they are on independent threads this could happen even with atomic operations (assuming we want to always let Subscriber methods propagate even after cancel) but worth adding a comment to clarify. Can you confirm if this would be a problem for this issue you are observing?

for (2) the following sequence is possible:

T1 - AmbSubscriber1.onSuccess, ignoreCancel, state.trySuccess, doneUpdater.GAS, CompositeCancellable.cancel()
T2 - AmbSubscriber2.onSuccess
T1 - AmbSubscriber2.cancel() // cancel already invoked, upstream but already delivered onSuccess
T2 - AmbSubscriber2.ignoreCancel() // cancel already invoked, this won't have any effect

Again because each Single can have a different thread for Subscriber and Subscription, and we want to give precedence to the Subscriber, I'm not sure we can prevent this. However worth noting (and add a comment) to confirm this won't cause an issue with your use case.

I suggest clarifying in comments the expectations and constraints:

  1. Let Subscriber take precedence - If Subscriber terminates after cancel(), we should propagate as we don't want to suppress work that is already complete and downstream may need to clean up.
  2. Best effort enforcement of 2.3 - Subscriber and Subscription can be on different threads, and because of (1) it is possible cancel may happen "after" Subscriber methods are invoked if concurrency is involved in the following scenarios:
  • Concurrency between Subscriber and Subscription for one AmbSubscriber object.
  • Concurrency between two different AmbSubscriber when State invokes cancel.

Copy link
Member

@idelpivnitskiy idelpivnitskiy Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the detailed description. Imho, both example are legit races that are true for other operators as well. For that type of a race, @bryce-anderson opened #3038.

Rule 2.3 says:

Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST NOT call any methods on the Subscription or the Publisher.

So, in this PR we only try to prevent a case when onComplete/onError invokes cancel() on the same thread after delivering the terminal. This helps to prevent a cycle, which is a side-effect of simplified logic around how we compose all cancellables together. The other way to fix this is to always compose other cancellables but not current, but it looked too complicated and requires more memory.

+1 for clarifying in comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assumption we currently have in all other places is that if cancel races with a terminal on different threads, we should do our best to propagate terminal down, but because we know that we can only do 1 terminal we should pessimistically assume that some other onError can win in a chain of operators and the onSuccess will be discarded. For this reason, we should always propagate a racy cancel up to the originator to make sure it can clean up the state, if necessary.

@idelpivnitskiy idelpivnitskiy changed the title Make sure Single#amb* variants respect reactive contract Amb operators for Single and Completable should respect reactive contract 2.3 Aug 13, 2024
Copy link
Contributor

@bryce-anderson bryce-anderson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is fine wrt the intrinsic races: the only goal here is to prevent breaking rule 2.3.

…oncurrent/internal/DelayedCancellable.java

Co-authored-by: Bryce Anderson <bl_anderson@apple.com>
Copy link
Contributor

@bryce-anderson bryce-anderson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is package private right now I'm fine with just making the API contract more clear in the documentation.

@daschl daschl merged commit 2316f7c into apple:main Aug 14, 2024
11 checks passed
@daschl
Copy link
Contributor Author

daschl commented Aug 14, 2024

I'll follow up with the docs in a separate pr.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants