Skip to content

Commit

Permalink
Amb operators for Single and Completable should respect reactiv…
Browse files Browse the repository at this point in the history
…e contract 2.3 (#3040)

Motivation
==========
Reactive Streams rule 2.3 mandates, that Subscriber.onComplete() and
Subscriber.onError(Throwable t) MUST NOT call any methods on the
Subscription or the Publisher.

Our current amb operator implementation also performs a cancellation
on the Single that just completed, which violates the rule.

Modifications
=============
This changeset modifies the amb operator in a way that all but the
one Single which got a termination signal (through onNext or onError)
will get the cancellation propagated.

The test suite is enhanced to cover this scenario, both for ambWith
as well as the static amb factory methods.

Result
======
The amb* operator variants adhere to reactive streams 2.3 rule.

Co-authored-by: Idel Pivnitskiy <idel.pivnitskiy@apple.com>
Co-authored-by: Bryce Anderson <bl_anderson@apple.com>
  • Loading branch information
3 people authored Aug 14, 2024
1 parent d09421a commit 2316f7c
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ public void onSubscribe(final Cancellable cancellable) {

@Override
public void onSuccess(@Nullable final T result) {
ignoreCancel();
state.trySuccess(result);
}

@Override
public void onError(final Throwable t) {
ignoreCancel();
state.tryError(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.servicetalk.concurrent.test.internal.TestCompletableSubscriber;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

Expand All @@ -32,10 +31,9 @@

class CompletableAmbTest {

private TestCompletable first;
private TestCompletable second;
private TestCompletableSubscriber subscriber;
private TestCancellable cancellable;
private final TestCompletable first = new TestCompletable();
private final TestCompletable second = new TestCompletable();
private final TestCompletableSubscriber subscriber = new TestCompletableSubscriber();

private enum AmbParam {
AMB_WITH {
Expand All @@ -60,14 +58,6 @@ BiFunction<Completable, Completable, Completable> get() {
abstract BiFunction<Completable, Completable, Completable> get();
}

@BeforeEach
void beforeEach() {
first = new TestCompletable();
second = new TestCompletable();
subscriber = new TestCompletableSubscriber();
cancellable = new TestCancellable();
}

private void setUp(final BiFunction<Completable, Completable, Completable> ambSupplier) {
toSource(ambSupplier.apply(first, second)).subscribe(subscriber);
subscriber.awaitSubscription();
Expand All @@ -81,6 +71,7 @@ void successFirst(final AmbParam ambParam) {
setUp(ambParam.get());
sendSuccessToAndVerify(first);
verifyCancelled(second);
verifyNotCancelled(first);
}

@ParameterizedTest(name = "{displayName} [{index}] {arguments}")
Expand All @@ -89,6 +80,7 @@ void successSecond(final AmbParam ambParam) {
setUp(ambParam.get());
sendSuccessToAndVerify(second);
verifyCancelled(first);
verifyNotCancelled(second);
}

@ParameterizedTest(name = "{displayName} [{index}] {arguments}")
Expand All @@ -97,6 +89,7 @@ void failFirst(final AmbParam ambParam) {
setUp(ambParam.get());
sendErrorToAndVerify(first);
verifyCancelled(second);
verifyNotCancelled(first);
}

@ParameterizedTest(name = "{displayName} [{index}] {arguments}")
Expand All @@ -105,6 +98,7 @@ void failSecond(final AmbParam ambParam) {
setUp(ambParam.get());
sendErrorToAndVerify(second);
verifyCancelled(first);
verifyNotCancelled(second);
}

@ParameterizedTest(name = "{displayName} [{index}] {arguments}")
Expand All @@ -113,6 +107,7 @@ void successFirstThenSecond(final AmbParam ambParam) {
setUp(ambParam.get());
sendSuccessToAndVerify(first);
verifyCancelled(second);
verifyNotCancelled(first);
second.onComplete();
}

Expand All @@ -122,6 +117,7 @@ void successSecondThenFirst(final AmbParam ambParam) {
setUp(ambParam.get());
sendSuccessToAndVerify(second);
verifyCancelled(first);
verifyNotCancelled(second);
first.onComplete();
}

Expand All @@ -131,6 +127,7 @@ void failFirstThenSecond(final AmbParam ambParam) {
setUp(ambParam.get());
sendErrorToAndVerify(first);
verifyCancelled(second);
verifyNotCancelled(first);
second.onError(DELIBERATE_EXCEPTION);
}

Expand All @@ -140,6 +137,7 @@ void failSecondThenFirst(final AmbParam ambParam) {
setUp(ambParam.get());
sendErrorToAndVerify(second);
verifyCancelled(first);
verifyNotCancelled(second);
first.onError(DELIBERATE_EXCEPTION);
}

Expand All @@ -149,6 +147,7 @@ void successFirstThenSecondFail(final AmbParam ambParam) {
setUp(ambParam.get());
sendSuccessToAndVerify(first);
verifyCancelled(second);
verifyNotCancelled(first);
second.onError(DELIBERATE_EXCEPTION);
}

Expand All @@ -158,6 +157,7 @@ void successSecondThenFirstFail(final AmbParam ambParam) {
setUp(ambParam.get());
sendSuccessToAndVerify(second);
verifyCancelled(first);
verifyNotCancelled(second);
first.onError(DELIBERATE_EXCEPTION);
}

Expand All @@ -167,6 +167,7 @@ void failFirstThenSecondSuccess(final AmbParam ambParam) {
setUp(ambParam.get());
sendErrorToAndVerify(first);
verifyCancelled(second);
verifyNotCancelled(first);
second.onComplete();
}

Expand All @@ -176,6 +177,7 @@ void failSecondThenFirstSuccess(final AmbParam ambParam) {
setUp(ambParam.get());
sendErrorToAndVerify(second);
verifyCancelled(first);
verifyNotCancelled(second);
first.onComplete();
}

Expand All @@ -189,8 +191,15 @@ private void sendErrorToAndVerify(final TestCompletable source) {
assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
}

private void verifyCancelled(final TestCompletable other) {
other.onSubscribe(cancellable);
assertThat("Other source not cancelled.", cancellable.isCancelled(), is(true));
private static void verifyNotCancelled(final TestCompletable completable) {
final TestCancellable cancellable = new TestCancellable();
completable.onSubscribe(cancellable);
assertThat("Completable cancelled when no cancellation was expected.", cancellable.isCancelled(), is(false));
}

private static void verifyCancelled(final TestCompletable completable) {
final TestCancellable cancellable = new TestCancellable();
completable.onSubscribe(cancellable);
assertThat("Completable not cancelled, but cancellation was expected.", cancellable.isCancelled(), is(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.test.internal.TestSingleSubscriber;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Arrays;
import java.util.List;
import java.util.function.BiFunction;

import static io.servicetalk.concurrent.api.Single.amb;
Expand All @@ -35,7 +40,6 @@ class SingleAmbTest {
private final TestSingle<Integer> first = new TestSingle<>();
private final TestSingle<Integer> second = new TestSingle<>();
private final TestSingleSubscriber<Integer> subscriber = new TestSingleSubscriber<>();
private final TestCancellable cancellable = new TestCancellable();

private enum AmbParam {
AMB_WITH {
Expand Down Expand Up @@ -72,6 +76,7 @@ private void init(final AmbParam ambParam) {
void successFirst(final AmbParam ambParam) {
init(ambParam);
sendSuccessToAndVerify(first);
verifyNotCancelled(first);
verifyCancelled(second);
}

Expand All @@ -80,6 +85,7 @@ void successFirst(final AmbParam ambParam) {
void successSecond(final AmbParam ambParam) {
init(ambParam);
sendSuccessToAndVerify(second);
verifyNotCancelled(second);
verifyCancelled(first);
}

Expand All @@ -88,6 +94,7 @@ void successSecond(final AmbParam ambParam) {
void failFirst(final AmbParam ambParam) {
init(ambParam);
sendErrorToAndVerify(first);
verifyNotCancelled(first);
verifyCancelled(second);
}

Expand All @@ -96,6 +103,7 @@ void failFirst(final AmbParam ambParam) {
void failSecond(final AmbParam ambParam) {
init(ambParam);
sendErrorToAndVerify(second);
verifyNotCancelled(second);
verifyCancelled(first);
}

Expand All @@ -104,6 +112,7 @@ void failSecond(final AmbParam ambParam) {
void successFirstThenSecond(final AmbParam ambParam) {
init(ambParam);
sendSuccessToAndVerify(first);
verifyNotCancelled(first);
verifyCancelled(second);
second.onSuccess(2);
}
Expand All @@ -113,6 +122,7 @@ void successFirstThenSecond(final AmbParam ambParam) {
void successSecondThenFirst(final AmbParam ambParam) {
init(ambParam);
sendSuccessToAndVerify(second);
verifyNotCancelled(second);
verifyCancelled(first);
first.onSuccess(2);
}
Expand All @@ -122,6 +132,7 @@ void successSecondThenFirst(final AmbParam ambParam) {
void failFirstThenSecond(final AmbParam ambParam) {
init(ambParam);
sendErrorToAndVerify(first);
verifyNotCancelled(first);
verifyCancelled(second);
second.onError(DELIBERATE_EXCEPTION);
}
Expand All @@ -131,6 +142,7 @@ void failFirstThenSecond(final AmbParam ambParam) {
void failSecondThenFirst(final AmbParam ambParam) {
init(ambParam);
sendErrorToAndVerify(second);
verifyNotCancelled(second);
verifyCancelled(first);
first.onError(DELIBERATE_EXCEPTION);
}
Expand All @@ -140,6 +152,7 @@ void failSecondThenFirst(final AmbParam ambParam) {
void successFirstThenSecondFail(final AmbParam ambParam) {
init(ambParam);
sendSuccessToAndVerify(first);
verifyNotCancelled(first);
verifyCancelled(second);
second.onError(DELIBERATE_EXCEPTION);
}
Expand All @@ -149,6 +162,7 @@ void successFirstThenSecondFail(final AmbParam ambParam) {
void successSecondThenFirstFail(final AmbParam ambParam) {
init(ambParam);
sendSuccessToAndVerify(second);
verifyNotCancelled(second);
verifyCancelled(first);
first.onError(DELIBERATE_EXCEPTION);
}
Expand All @@ -158,6 +172,7 @@ void successSecondThenFirstFail(final AmbParam ambParam) {
void failFirstThenSecondSuccess(final AmbParam ambParam) {
init(ambParam);
sendErrorToAndVerify(first);
verifyNotCancelled(first);
verifyCancelled(second);
second.onSuccess(2);
}
Expand All @@ -167,10 +182,49 @@ void failFirstThenSecondSuccess(final AmbParam ambParam) {
void failSecondThenFirstSuccess(final AmbParam ambParam) {
init(ambParam);
sendErrorToAndVerify(second);
verifyNotCancelled(second);
verifyCancelled(first);
first.onSuccess(2);
}

@ParameterizedTest(name = "{displayName} [{index}] index={0}")
@ValueSource(ints = {0, 1, 2, 3})
void doNotCancelCompletedWithMoreSingles(final int index) {
final List<TestSingle<Integer>> singles = Arrays.asList(new TestSingle<>(), new TestSingle<>(),
new TestSingle<>(), new TestSingle<>());

final TestSingleSubscriber<Integer> subscriber = new TestSingleSubscriber<>();
toSource(amb(singles.get(0), singles.get(1), singles.get(2), singles.get(3))).subscribe(subscriber);
subscriber.awaitSubscription();

final TestSingle<Integer> terminates = singles.get(index);
terminates.onSuccess(1);
assertThat("Unexpected result.", subscriber.awaitOnSuccess(), is(1));

verifyNotCancelled(terminates);
for (int i = 0; i < singles.size(); i++) {
if (i != index) {
verifyCancelled(singles.get(i));
}
}
}

@ParameterizedTest(name = "{displayName} [{index}] error={0}, completeFirst={1}")
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
void receiveTerminationSignalAfterCancel(final boolean error, final boolean completeFirst) {
toSource(amb(first, second)).subscribe(subscriber);
final Cancellable subscription = subscriber.awaitSubscription();
subscription.cancel();

verifyCancelled(first);
verifyCancelled(second);
if (error) {
sendErrorToAndVerify(completeFirst ? first : second);
} else {
sendSuccessToAndVerify(completeFirst ? first : second);
}
}

private void sendSuccessToAndVerify(final TestSingle<Integer> source) {
source.onSuccess(1);
assertThat("Unexpected result.", subscriber.awaitOnSuccess(), is(1));
Expand All @@ -181,8 +235,15 @@ private void sendErrorToAndVerify(final TestSingle<Integer> source) {
assertThat("Unexpected error result.", subscriber.awaitOnError(), is(sameInstance(DELIBERATE_EXCEPTION)));
}

private void verifyCancelled(final TestSingle<Integer> other) {
other.onSubscribe(cancellable);
assertThat("Other source not cancelled.", cancellable.isCancelled(), is(true));
private static void verifyNotCancelled(final TestSingle<Integer> single) {
final TestCancellable cancellable = new TestCancellable();
single.onSubscribe(cancellable);
assertThat("Single cancelled when no cancellation was expected.", cancellable.isCancelled(), is(false));
}

private static void verifyCancelled(final TestSingle<Integer> single) {
final TestCancellable cancellable = new TestCancellable();
single.onSubscribe(cancellable);
assertThat("Single not cancelled, but cancellation was expected.", cancellable.isCancelled(), is(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,11 @@ public void cancel() {
oldCancellable.cancel();
}
}

/**
* Ignores any subsequent calls to {@link #cancel()}, preventing propagating the cancellation further up the stream.
*/
protected final void ignoreCancel() {
currentUpdater.set(this, IGNORE_CANCEL);
}
}

0 comments on commit 2316f7c

Please sign in to comment.