Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Describe merge() error handling. #5781

Merged
merged 1 commit into from
Jan 3, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2965,6 +2965,19 @@ public static <T> Flowable<T> mergeArray(int maxConcurrency, int bufferSize, Pub
* backpressure; if violated, the operator <em>may</em> signal {@code MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If any of the source {@code Publisher}s signal a {@code Throwable} via {@code onError}, the resulting
* {@code Flowable} terminates with that {@code Throwable} and all other source {@code Publisher}s are cancelled.
* If more than one {@code Publisher} signals an error, the resulting {@code Flowable} may terminate with the
* first one's error or, depending on the concurrency of the sources, may terminate with a
* {@code CompositeException} containing two or more of the various error signals.
* {@code Throwable}s that didn't make into the composite will be sent (individually) to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} method as <em>undeliverable errors</em>. Similarly, {@code Throwable}s
* signaled by source(s) after the returned {@code Flowable} has been cancelled or terminated with a
* (composite) error will be sent to the same global error handler.
* Use {@link #mergeDelayError(Iterable)} to merge sources and terminate only when all source {@code Publisher}s
* have completed or failed with an error.
* </dd>
Copy link

@tony-root tony-root Dec 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to explicitly state that split-merge pattern like

PublishSubject<Boolean> ps = PublishSubject.create();

Observable.merge(
        ps.filter((condition) -> condition),
        ps.filter((condition) -> !condition)
).subscribe(
        (next) -> { },
        (error) -> error.printStackTrace()
);

ps.onError(new RuntimeException("Will cause UndeliverableException"));

is guaranteed to produce an UndeliverableException in addition to onError?

Maybe in Wiki instead of javadoc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main effect is the merge here, not the split. Also I'm not sure how often people find out about this behavior through this split-merge pattern.

Copy link

@tony-root tony-root Dec 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed merge is the main reason, however every time people use split-merge and don't find out about this behavior they introduce a potential hard-to-find bug.

It still seems to me that merge should by default behave like mergeFirstErrorOnly from #5779 discussion, but I'm definitely not pushing.

* </dl>
*
* @param <T> the common element base type
Expand All @@ -2973,6 +2986,7 @@ public static <T> Flowable<T> mergeArray(int maxConcurrency, int bufferSize, Pub
* @return a Flowable that emits items that are the result of flattening the items emitted by the
* Publishers in the Iterable
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #mergeDelayError(Iterable)
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@CheckReturnValue
Expand Down