Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amb operators for Single and Completable should respect reactive contract 2.3 #3040

Merged
merged 3 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ public void onSubscribe(final Cancellable cancellable) {

@Override
public void onSuccess(@Nullable final T result) {
ignoreCancel();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 cases to consider consider:

  1. Concurrency between Subscriber and Subscription for one AmbSubscriber object.
  2. Concurrency between two different AmbSubscriber when State invokes cancel.

for (1) the following sequence is possible:

T1 - Subscription.cancel(), DelayedCancellable.GAS currentUpdater reads Cancellable into oldCancellable
T2 - AmbSubscriber.onSuccess(T), DelayedCancellable.ignoreCancel(), state.trySuccess, target.onSuccess
T1 - oldCancellable.cancel()

Because they are on independent threads this could happen even with atomic operations (assuming we want to always let Subscriber methods propagate even after cancel) but worth adding a comment to clarify. Can you confirm if this would be a problem for this issue you are observing?

for (2) the following sequence is possible:

T1 - AmbSubscriber1.onSuccess, ignoreCancel, state.trySuccess, doneUpdater.GAS, CompositeCancellable.cancel()
T2 - AmbSubscriber2.onSuccess
T1 - AmbSubscriber2.cancel() // cancel already invoked, upstream but already delivered onSuccess
T2 - AmbSubscriber2.ignoreCancel() // cancel already invoked, this won't have any effect

Again because each Single can have a different thread for Subscriber and Subscription, and we want to give precedence to the Subscriber, I'm not sure we can prevent this. However worth noting (and add a comment) to confirm this won't cause an issue with your use case.

I suggest clarifying in comments the expectations and constraints:

  1. Let Subscriber take precedence - If Subscriber terminates after cancel(), we should propagate as we don't want to suppress work that is already complete and downstream may need to clean up.
  2. Best effort enforcement of 2.3 - Subscriber and Subscription can be on different threads, and because of (1) it is possible cancel may happen "after" Subscriber methods are invoked if concurrency is involved in the following scenarios:
  • Concurrency between Subscriber and Subscription for one AmbSubscriber object.
  • Concurrency between two different AmbSubscriber when State invokes cancel.

Copy link
Member

@idelpivnitskiy idelpivnitskiy Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the detailed description. Imho, both example are legit races that are true for other operators as well. For that type of a race, @bryce-anderson opened #3038.

Rule 2.3 says:

Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST NOT call any methods on the Subscription or the Publisher.

So, in this PR we only try to prevent a case when onComplete/onError invokes cancel() on the same thread after delivering the terminal. This helps to prevent a cycle, which is a side-effect of simplified logic around how we compose all cancellables together. The other way to fix this is to always compose other cancellables but not current, but it looked too complicated and requires more memory.

+1 for clarifying in comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assumption we currently have in all other places is that if cancel races with a terminal on different threads, we should do our best to propagate terminal down, but because we know that we can only do 1 terminal we should pessimistically assume that some other onError can win in a chain of operators and the onSuccess will be discarded. For this reason, we should always propagate a racy cancel up to the originator to make sure it can clean up the state, if necessary.

state.trySuccess(result);
}

@Override
public void onError(final Throwable t) {
ignoreCancel();
state.tryError(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.servicetalk.concurrent.test.internal.TestCompletableSubscriber;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

Expand All @@ -32,10 +31,9 @@

class CompletableAmbTest {

private TestCompletable first;
private TestCompletable second;
private TestCompletableSubscriber subscriber;
private TestCancellable cancellable;
private final TestCompletable first = new TestCompletable();
private final TestCompletable second = new TestCompletable();
private final TestCompletableSubscriber subscriber = new TestCompletableSubscriber();

private enum AmbParam {
AMB_WITH {
Expand All @@ -60,14 +58,6 @@ BiFunction<Completable, Completable, Completable> get() {
abstract BiFunction<Completable, Completable, Completable> get();
}

@BeforeEach
void beforeEach() {
first = new TestCompletable();
second = new TestCompletable();
subscriber = new TestCompletableSubscriber();
cancellable = new TestCancellable();
}

private void setUp(final BiFunction<Completable, Completable, Completable> ambSupplier) {
toSource(ambSupplier.apply(first, second)).subscribe(subscriber);
subscriber.awaitSubscription();
Expand All @@ -81,6 +71,7 @@ void successFirst(final AmbParam ambParam) {
setUp(ambParam.get());
sendSuccessToAndVerify(first);
verifyCancelled(second);
verifyNotCancelled(first);
}

@ParameterizedTest(name = "{displayName} [{index}] {arguments}")
Expand All @@ -89,6 +80,7 @@ void successSecond(final AmbParam ambParam) {
setUp(ambParam.get());
sendSuccessToAndVerify(second);
verifyCancelled(first);
verifyNotCancelled(second);
}

@ParameterizedTest(name = "{displayName} [{index}] {arguments}")
Expand All @@ -97,6 +89,7 @@ void failFirst(final AmbParam ambParam) {
setUp(ambParam.get());
sendErrorToAndVerify(first);
verifyCancelled(second);
verifyNotCancelled(first);
}

@ParameterizedTest(name = "{displayName} [{index}] {arguments}")
Expand All @@ -105,6 +98,7 @@ void failSecond(final AmbParam ambParam) {
setUp(ambParam.get());
sendErrorToAndVerify(second);
verifyCancelled(first);
verifyNotCancelled(second);
}

@ParameterizedTest(name = "{displayName} [{index}] {arguments}")
Expand All @@ -113,6 +107,7 @@ void successFirstThenSecond(final AmbParam ambParam) {
setUp(ambParam.get());
sendSuccessToAndVerify(first);
verifyCancelled(second);
verifyNotCancelled(first);
second.onComplete();
}

Expand All @@ -122,6 +117,7 @@ void successSecondThenFirst(final AmbParam ambParam) {
setUp(ambParam.get());
sendSuccessToAndVerify(second);
verifyCancelled(first);
verifyNotCancelled(second);
first.onComplete();
}

Expand All @@ -131,6 +127,7 @@ void failFirstThenSecond(final AmbParam ambParam) {
setUp(ambParam.get());
sendErrorToAndVerify(first);
verifyCancelled(second);
verifyNotCancelled(first);
second.onError(DELIBERATE_EXCEPTION);
}

Expand All @@ -140,6 +137,7 @@ void failSecondThenFirst(final AmbParam ambParam) {
setUp(ambParam.get());
sendErrorToAndVerify(second);
verifyCancelled(first);
verifyNotCancelled(second);
first.onError(DELIBERATE_EXCEPTION);
}

Expand All @@ -149,6 +147,7 @@ void successFirstThenSecondFail(final AmbParam ambParam) {
setUp(ambParam.get());
sendSuccessToAndVerify(first);
verifyCancelled(second);
verifyNotCancelled(first);
second.onError(DELIBERATE_EXCEPTION);
}

Expand All @@ -158,6 +157,7 @@ void successSecondThenFirstFail(final AmbParam ambParam) {
setUp(ambParam.get());
sendSuccessToAndVerify(second);
verifyCancelled(first);
verifyNotCancelled(second);
first.onError(DELIBERATE_EXCEPTION);
}

Expand All @@ -167,6 +167,7 @@ void failFirstThenSecondSuccess(final AmbParam ambParam) {
setUp(ambParam.get());
sendErrorToAndVerify(first);
verifyCancelled(second);
verifyNotCancelled(first);
second.onComplete();
}

Expand All @@ -176,6 +177,7 @@ void failSecondThenFirstSuccess(final AmbParam ambParam) {
setUp(ambParam.get());
sendErrorToAndVerify(second);
verifyCancelled(first);
verifyNotCancelled(second);
first.onComplete();
}

Expand All @@ -189,8 +191,15 @@ private void sendErrorToAndVerify(final TestCompletable source) {
assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
}

private void verifyCancelled(final TestCompletable other) {
other.onSubscribe(cancellable);
assertThat("Other source not cancelled.", cancellable.isCancelled(), is(true));
private static void verifyNotCancelled(final TestCompletable completable) {
final TestCancellable cancellable = new TestCancellable();
completable.onSubscribe(cancellable);
assertThat("Completable cancelled when no cancellation was expected.", cancellable.isCancelled(), is(false));
}

private static void verifyCancelled(final TestCompletable completable) {
final TestCancellable cancellable = new TestCancellable();
completable.onSubscribe(cancellable);
assertThat("Completable not cancelled, but cancellation was expected.", cancellable.isCancelled(), is(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.test.internal.TestSingleSubscriber;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Arrays;
import java.util.List;
import java.util.function.BiFunction;

import static io.servicetalk.concurrent.api.Single.amb;
Expand All @@ -35,7 +40,6 @@ class SingleAmbTest {
private final TestSingle<Integer> first = new TestSingle<>();
private final TestSingle<Integer> second = new TestSingle<>();
private final TestSingleSubscriber<Integer> subscriber = new TestSingleSubscriber<>();
private final TestCancellable cancellable = new TestCancellable();

private enum AmbParam {
AMB_WITH {
Expand Down Expand Up @@ -72,6 +76,7 @@ private void init(final AmbParam ambParam) {
void successFirst(final AmbParam ambParam) {
init(ambParam);
sendSuccessToAndVerify(first);
verifyNotCancelled(first);
daschl marked this conversation as resolved.
Show resolved Hide resolved
verifyCancelled(second);
}

Expand All @@ -80,6 +85,7 @@ void successFirst(final AmbParam ambParam) {
void successSecond(final AmbParam ambParam) {
init(ambParam);
sendSuccessToAndVerify(second);
verifyNotCancelled(second);
verifyCancelled(first);
}

Expand All @@ -88,6 +94,7 @@ void successSecond(final AmbParam ambParam) {
void failFirst(final AmbParam ambParam) {
init(ambParam);
sendErrorToAndVerify(first);
verifyNotCancelled(first);
verifyCancelled(second);
}

Expand All @@ -96,6 +103,7 @@ void failFirst(final AmbParam ambParam) {
void failSecond(final AmbParam ambParam) {
init(ambParam);
sendErrorToAndVerify(second);
verifyNotCancelled(second);
verifyCancelled(first);
}

Expand All @@ -104,6 +112,7 @@ void failSecond(final AmbParam ambParam) {
void successFirstThenSecond(final AmbParam ambParam) {
init(ambParam);
sendSuccessToAndVerify(first);
verifyNotCancelled(first);
verifyCancelled(second);
second.onSuccess(2);
}
Expand All @@ -113,6 +122,7 @@ void successFirstThenSecond(final AmbParam ambParam) {
void successSecondThenFirst(final AmbParam ambParam) {
init(ambParam);
sendSuccessToAndVerify(second);
verifyNotCancelled(second);
verifyCancelled(first);
first.onSuccess(2);
}
Expand All @@ -122,6 +132,7 @@ void successSecondThenFirst(final AmbParam ambParam) {
void failFirstThenSecond(final AmbParam ambParam) {
init(ambParam);
sendErrorToAndVerify(first);
verifyNotCancelled(first);
verifyCancelled(second);
second.onError(DELIBERATE_EXCEPTION);
}
Expand All @@ -131,6 +142,7 @@ void failFirstThenSecond(final AmbParam ambParam) {
void failSecondThenFirst(final AmbParam ambParam) {
init(ambParam);
sendErrorToAndVerify(second);
verifyNotCancelled(second);
verifyCancelled(first);
first.onError(DELIBERATE_EXCEPTION);
}
Expand All @@ -140,6 +152,7 @@ void failSecondThenFirst(final AmbParam ambParam) {
void successFirstThenSecondFail(final AmbParam ambParam) {
init(ambParam);
sendSuccessToAndVerify(first);
verifyNotCancelled(first);
verifyCancelled(second);
second.onError(DELIBERATE_EXCEPTION);
}
Expand All @@ -149,6 +162,7 @@ void successFirstThenSecondFail(final AmbParam ambParam) {
void successSecondThenFirstFail(final AmbParam ambParam) {
init(ambParam);
sendSuccessToAndVerify(second);
verifyNotCancelled(second);
verifyCancelled(first);
first.onError(DELIBERATE_EXCEPTION);
}
Expand All @@ -158,6 +172,7 @@ void successSecondThenFirstFail(final AmbParam ambParam) {
void failFirstThenSecondSuccess(final AmbParam ambParam) {
init(ambParam);
sendErrorToAndVerify(first);
verifyNotCancelled(first);
verifyCancelled(second);
second.onSuccess(2);
}
Expand All @@ -167,10 +182,49 @@ void failFirstThenSecondSuccess(final AmbParam ambParam) {
void failSecondThenFirstSuccess(final AmbParam ambParam) {
init(ambParam);
sendErrorToAndVerify(second);
verifyNotCancelled(second);
verifyCancelled(first);
first.onSuccess(2);
}

@ParameterizedTest(name = "{displayName} [{index}] index={0}")
@ValueSource(ints = {0, 1, 2, 3})
void doNotCancelCompletedWithMoreSingles(final int index) {
final List<TestSingle<Integer>> singles = Arrays.asList(new TestSingle<>(), new TestSingle<>(),
new TestSingle<>(), new TestSingle<>());

final TestSingleSubscriber<Integer> subscriber = new TestSingleSubscriber<>();
toSource(amb(singles.get(0), singles.get(1), singles.get(2), singles.get(3))).subscribe(subscriber);
subscriber.awaitSubscription();

final TestSingle<Integer> terminates = singles.get(index);
terminates.onSuccess(1);
assertThat("Unexpected result.", subscriber.awaitOnSuccess(), is(1));

verifyNotCancelled(terminates);
for (int i = 0; i < singles.size(); i++) {
if (i != index) {
verifyCancelled(singles.get(i));
}
}
}

@ParameterizedTest(name = "{displayName} [{index}] error={0}, completeFirst={1}")
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
void receiveTerminationSignalAfterCancel(final boolean error, final boolean completeFirst) {
toSource(amb(first, second)).subscribe(subscriber);
final Cancellable subscription = subscriber.awaitSubscription();
subscription.cancel();

verifyCancelled(first);
verifyCancelled(second);
if (error) {
sendErrorToAndVerify(completeFirst ? first : second);
} else {
sendSuccessToAndVerify(completeFirst ? first : second);
}
}

private void sendSuccessToAndVerify(final TestSingle<Integer> source) {
source.onSuccess(1);
assertThat("Unexpected result.", subscriber.awaitOnSuccess(), is(1));
Expand All @@ -181,8 +235,15 @@ private void sendErrorToAndVerify(final TestSingle<Integer> source) {
assertThat("Unexpected error result.", subscriber.awaitOnError(), is(sameInstance(DELIBERATE_EXCEPTION)));
}

private void verifyCancelled(final TestSingle<Integer> other) {
other.onSubscribe(cancellable);
assertThat("Other source not cancelled.", cancellable.isCancelled(), is(true));
private static void verifyNotCancelled(final TestSingle<Integer> single) {
final TestCancellable cancellable = new TestCancellable();
single.onSubscribe(cancellable);
assertThat("Single cancelled when no cancellation was expected.", cancellable.isCancelled(), is(false));
}

private static void verifyCancelled(final TestSingle<Integer> single) {
final TestCancellable cancellable = new TestCancellable();
single.onSubscribe(cancellable);
assertThat("Single not cancelled, but cancellation was expected.", cancellable.isCancelled(), is(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,11 @@ public void cancel() {
oldCancellable.cancel();
}
}

/**
* Ignores any subsequent calls to {@link #cancel()}, preventing propagating the cancellation further up the stream.
*/
daschl marked this conversation as resolved.
Show resolved Hide resolved
protected void ignoreCancel() {
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
daschl marked this conversation as resolved.
Show resolved Hide resolved
currentUpdater.set(this, IGNORE_CANCEL);
}
}
Loading