Skip to content

2.x: Document and test amb subscription ordering. #5047

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public abstract class Completable implements CompletableSource {
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ambArray} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the array of source Completables
* @param sources the array of source Completables. A subscription to each source will
* occur in the same order as in this array.
* @return the new Completable instance
* @throws NullPointerException if sources is null
*/
Expand All @@ -71,7 +72,8 @@ public static Completable ambArray(final CompletableSource... sources) {
* <dt><b>Scheduler:</b></dt>
* <dd>{@code amb} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the array of source Completables
* @param sources the array of source Completables. A subscription to each source will
* occur in the same order as in this Iterable.
* @return the new Completable instance
* @throws NullPointerException if sources is null
*/
Expand Down Expand Up @@ -776,7 +778,8 @@ public static Completable wrap(CompletableSource source) {
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ambWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param other the other Completable, not null
* @param other the other Completable, not null. A subscription to this provided source will occur after subscribing
* to the current source.
* @return the new Completable instance
* @throws NullPointerException if other is null
*/
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public abstract class Flowable<T> implements Publisher<T> {
*
* @param <T> the common element type
* @param sources
* an Iterable of Publishers sources competing to react first
* an Iterable of Publishers sources competing to react first. A subscription to each Publisher will
* occur in the same order as in this Iterable.
* @return a Flowable that emits the same sequence as whichever of the source Publishers first
* emitted an item or sent a termination notification
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
Expand Down Expand Up @@ -106,7 +107,8 @@ public static <T> Flowable<T> amb(Iterable<? extends Publisher<? extends T>> sou
*
* @param <T> the common element type
* @param sources
* an array of Publisher sources competing to react first
* an array of Publisher sources competing to react first. A subscription to each Publisher will
* occur in the same order as in this Iterable.
* @return a Flowable that emits the same sequence as whichever of the source Publishers first
* emitted an item or sent a termination notification
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
Expand Down Expand Up @@ -5131,7 +5133,8 @@ public final Single<Boolean> all(Predicate<? super T> predicate) {
* </dl>
*
* @param other
* a Publisher competing to react first
* a Publisher competing to react first. A subscription to this provided Publisher will occur after subscribing
* to the current Publisher.
* @return a Flowable that emits the same sequence as whichever of the source Publishers first
* emitted an item or sent a termination notification
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public abstract class Maybe<T> implements MaybeSource<T> {
* <dd>{@code amb} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the Iterable sequence of sources
* @param sources the Iterable sequence of sources. A subscription to each source will
* occur in the same order as in the Iterable.
* @return the new Maybe instance
*/
@CheckReturnValue
Expand All @@ -72,7 +73,8 @@ public static <T> Maybe<T> amb(final Iterable<? extends MaybeSource<? extends T>
* <dd>{@code ambArray} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the array of sources
* @param sources the array of sources. A subscription to each source will
* occur in the same order as in the array.
* @return the new Maybe instance
*/
@CheckReturnValue
Expand Down Expand Up @@ -1966,7 +1968,8 @@ public static <T, R> Maybe<R> zipArray(Function<? super Object[], ? extends R> z
* </dl>
*
* @param other
* a MaybeSource competing to react first
* a MaybeSource competing to react first. A subscription to this provided source will occur after
* subscribing to the current source.
* @return a Maybe that emits the same sequence as whichever of the source MaybeSources first
* signalled
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public abstract class Observable<T> implements ObservableSource<T> {
*
* @param <T> the common element type
* @param sources
* an Iterable of ObservableSources sources competing to react first
* an Iterable of ObservableSource sources competing to react first. A subscription to each source will
* occur in the same order as in the Iterable.
* @return an Observable that emits the same sequence as whichever of the source ObservableSources first
* emitted an item or sent a termination notification
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
Expand All @@ -93,7 +94,8 @@ public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extend
*
* @param <T> the common element type
* @param sources
* an array of ObservableSource sources competing to react first
* an array of ObservableSource sources competing to react first. A subscription to each source will
* occur in the same order as in the array.
* @return an Observable that emits the same sequence as whichever of the source ObservableSources first
* emitted an item or sent a termination notification
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
Expand Down Expand Up @@ -4547,7 +4549,8 @@ public final Single<Boolean> all(Predicate<? super T> predicate) {
* </dl>
*
* @param other
* an ObservableSource competing to react first
* an ObservableSource competing to react first. A subscription to this provided source will occur after
* subscribing to the current source.
* @return an Observable that emits the same sequence as whichever of the source ObservableSources first
* emitted an item or sent a termination notification
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public abstract class Single<T> implements SingleSource<T> {
* <dd>{@code amb} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the Iterable sequence of sources
* @param sources the Iterable sequence of sources. A subscription to each source will
* occur in the same order as in this Iterable.
* @return the new Single instance
* @since 2.0
*/
Expand All @@ -86,7 +87,8 @@ public static <T> Single<T> amb(final Iterable<? extends SingleSource<? extends
* <dd>{@code ambArray} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the array of sources
* @param sources the array of sources. A subscription to each source will
* occur in the same order as in this array.
* @return the new Single instance
* @since 2.0
*/
Expand Down Expand Up @@ -1493,7 +1495,8 @@ public static <T, R> Single<R> zipArray(Function<? super Object[], ? extends R>
* <dd>{@code ambWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param other the other SingleSource to race for the first emission of success or error
* @return the new Single instance
* @return the new Single instance. A subscription to this provided source will occur after subscribing
* to the current source.
* @since 2.0
*/
@CheckReturnValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,22 @@ public void run() {
}
}

@Test
public void ambWithOrder() {
Completable error = Completable.error(new RuntimeException());
Completable.complete().ambWith(error).test().assertComplete();
}

@Test
public void ambIterableOrder() {
Completable error = Completable.error(new RuntimeException());
Completable.amb(Arrays.asList(Completable.complete(), error)).test().assertComplete();
}

@Test
public void ambArrayOrder() {
Completable error = Completable.error(new RuntimeException());
Completable.ambArray(Completable.complete(), error).test().assertComplete();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -697,4 +697,24 @@ public Flowable<Integer> apply(Integer v) throws Exception {
.test()
.assertFailureAndMessage(TestException.class, "next()");
}

@Test
public void ambWithOrder() {
Flowable<Integer> error = Flowable.error(new RuntimeException());
Flowable.just(1).ambWith(error).test().assertValue(1).assertComplete();
}

@SuppressWarnings("unchecked")
@Test
public void ambIterableOrder() {
Flowable<Integer> error = Flowable.error(new RuntimeException());
Flowable.amb(Arrays.asList(Flowable.just(1), error)).test().assertValue(1).assertComplete();
}

@SuppressWarnings("unchecked")
@Test
public void ambArrayOrder() {
Flowable<Integer> error = Flowable.error(new RuntimeException());
Flowable.ambArray(Flowable.just(1), error).test().assertValue(1).assertComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,4 +363,24 @@ public void run() {
}
}
}

@Test
public void ambWithOrder() {
Observable<Integer> error = Observable.error(new RuntimeException());
Observable.just(1).ambWith(error).test().assertValue(1).assertComplete();
}

@SuppressWarnings("unchecked")
@Test
public void ambIterableOrder() {
Observable<Integer> error = Observable.error(new RuntimeException());
Observable.amb(Arrays.asList(Observable.just(1), error)).test().assertValue(1).assertComplete();
}

@SuppressWarnings("unchecked")
@Test
public void ambArrayOrder() {
Observable<Integer> error = Observable.error(new RuntimeException());
Observable.ambArray(Observable.just(1), error).test().assertValue(1).assertComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,24 @@ public void manySources() {
.test()
.assertResult(31);
}

@Test
public void ambWithOrder() {
Single<Integer> error = Single.error(new RuntimeException());
Single.just(1).ambWith(error).test().assertValue(1);
}

@SuppressWarnings("unchecked")
@Test
public void ambIterableOrder() {
Single<Integer> error = Single.error(new RuntimeException());
Single.amb(Arrays.asList(Single.just(1), error)).test().assertValue(1);
}

@SuppressWarnings("unchecked")
@Test
public void ambArrayOrder() {
Single<Integer> error = Single.error(new RuntimeException());
Single.ambArray(Single.just(1), error).test().assertValue(1);
}
}
20 changes: 20 additions & 0 deletions src/test/java/io/reactivex/maybe/MaybeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1580,6 +1580,26 @@ public void ambArrayOne() {
assertSame(Maybe.never(), Maybe.ambArray(Maybe.never()));
}

@Test
public void ambWithOrder() {
Maybe<Integer> error = Maybe.error(new RuntimeException());
Maybe.just(1).ambWith(error).test().assertValue(1);
}

@SuppressWarnings("unchecked")
@Test
public void ambIterableOrder() {
Maybe<Integer> error = Maybe.error(new RuntimeException());
Maybe.amb(Arrays.asList(Maybe.just(1), error)).test().assertValue(1);
}

@SuppressWarnings("unchecked")
@Test
public void ambArrayOrder() {
Maybe<Integer> error = Maybe.error(new RuntimeException());
Maybe.ambArray(Maybe.just(1), error).test().assertValue(1);
}

@SuppressWarnings("unchecked")
@Test
public void ambArray1SignalsSuccess() {
Expand Down