Skip to content

Added Observable.concat(Iterable) #4330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,32 @@ public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends Ob
return create(new OnSubscribeCombineLatest<T, R>(null, sources, combineFunction, RxRingBuffer.SIZE, true));
}

/**
* Flattens an Iterable of Observables into one Observable, one after the other, without
* interleaving them.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. The {@code Observable}
* sources are expected to honor backpressure as well.
* If any of the source {@code Observable}s violate this, it <em>may</em> throw an
* {@code IllegalStateException} when the source {@code Observable} completes.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sequences
* the Iterable of Observables
* @return an Observable that emits items that are the result of flattening the items emitted by the
* Observables in the Iterable, one after the other, without interleaving them
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
*/
public static <T> Observable<T> concat(Iterable<? extends Observable<? extends T>> sequences) {
return concat(from(sequences));
}

/**
* Returns an Observable that emits the items emitted by each of the Observables emitted by the source
* Observable, one after the other, without interleaving them.
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/rx/ConcatTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testConcatWithIterableOfObservable() {
@SuppressWarnings("unchecked")
Iterable<Observable<String>> is = Arrays.asList(o1, o2, o3);

List<String> values = Observable.concat(Observable.from(is)).toList().toBlocking().single();
List<String> values = Observable.concat(is).toList().toBlocking().single();

assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
Expand Down