Skip to content

2.x: reintroduce OnErrorNotImplementedException for 0-1 argument subscribe() #5036

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,10 @@ public final Disposable subscribe(final Action onComplete, final Consumer<? supe
* Subscribes to this Completable and calls the given Action when this Completable
* completes normally.
* <p>
* If the Completable emits an error, it is wrapped into an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it's twice.

If the completable emits an error...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once with the note about OnErrorNotImplemented..., once as swallowed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great opportunity to post a PR!

* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <p>
* If this Completable emits an error, it is sent to RxJavaPlugins.onError and gets swallowed.
* <dl>
* <dt><b>Scheduler:</b></dt>
Expand Down
25 changes: 19 additions & 6 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5559,6 +5559,11 @@ public final void blockingSubscribe() {

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
* <p>
* If the Flowable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <p>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an unbounded manner
Expand All @@ -5572,7 +5577,7 @@ public final void blockingSubscribe() {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(Consumer<? super T> onNext) {
FlowableBlockingSubscribe.subscribe(this, onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
FlowableBlockingSubscribe.subscribe(this, onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand Down Expand Up @@ -8983,6 +8988,10 @@ public final Disposable forEach(Consumer<? super T> onNext) {
/**
* Subscribes to the {@link Publisher} and receives notifications for each element until the
* onNext Predicate returns false.
* <p>
* If the Flowable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Backpressure:</b><dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner (i.e., no
Expand All @@ -9003,7 +9012,7 @@ public final Disposable forEach(Consumer<? super T> onNext) {
@BackpressureSupport(BackpressureKind.NONE)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable forEachWhile(Predicate<? super T> onNext) {
return forEachWhile(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
return forEachWhile(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand Down Expand Up @@ -12713,7 +12722,9 @@ public final Flowable<T> strict() {
/**
* Subscribes to a Publisher and ignores {@code onNext} and {@code onComplete} emissions.
* <p>
* If the Flowable emits an error, it is routed to the RxJavaPlugins.onError handler.
* If the Flowable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Backpressure:</b><dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner (i.e., no
Expand All @@ -12729,14 +12740,16 @@ public final Flowable<T> strict() {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER,
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}

/**
* Subscribes to a Publisher and provides a callback to handle the items it emits.
* <p>
* If the Flowable emits an error, it is routed to the RxJavaPlugins.onError handler.
* If the Flowable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Backpressure:</b><dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner (i.e., no
Expand All @@ -12757,7 +12770,7 @@ public final Disposable subscribe() {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ERROR_CONSUMER,
return subscribe(onNext, Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}

Expand Down
12 changes: 8 additions & 4 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -3592,7 +3592,9 @@ public final Maybe<T> retryWhen(
/**
* Subscribes to a Maybe and ignores {@code onSuccess} and {@code onComplete} emissions.
* <p>
* If the Maybe emits an error, it is routed to the RxJavaPlugins.onError handler.
* If the Maybe emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -3604,13 +3606,15 @@ public final Maybe<T> retryWhen(
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
* Subscribes to a Maybe and provides a callback to handle the items it emits.
* <p>
* If the Maybe emits an error, it is routed to the RxJavaPlugins.onError handler.
* If the Maybe emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -3627,7 +3631,7 @@ public final Disposable subscribe() {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onSuccess) {
return subscribe(onSuccess, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
return subscribe(onSuccess, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand Down
26 changes: 19 additions & 7 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4917,6 +4917,10 @@ public final void blockingSubscribe() {

/**
* Subscribes to the source and calls the given callbacks <strong>on the current thread</strong>.
* <p>
* If the Observable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -4926,7 +4930,7 @@ public final void blockingSubscribe() {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(Consumer<? super T> onNext) {
ObservableBlockingSubscribe.subscribe(this, onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
ObservableBlockingSubscribe.subscribe(this, onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand Down Expand Up @@ -6761,7 +6765,7 @@ public final Observable<T> doFinally(Action onFinally) {
* The action is shared between subscriptions and thus may be called concurrently from multiple
* threads; the action must be thread safe.
* <p>
* If the action throws a runtime exception, that exception is rethrown by the {@code unsubscribe()} call,
* If the action throws a runtime exception, that exception is rethrown by the {@code dispose()} call,
* sometimes as a {@code CompositeException} if there were multiple exceptions along the way.
* <p>
* Note that terminal events trigger the action unless the {@code ObservableSource} is subscribed to via {@code unsafeSubscribe()}.
Expand Down Expand Up @@ -7785,6 +7789,10 @@ public final Disposable forEach(Consumer<? super T> onNext) {
/**
* Subscribes to the {@link ObservableSource} and receives notifications for each element until the
* onNext Predicate returns false.
* <p>
* If the Observable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code forEachWhile} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -7801,7 +7809,7 @@ public final Disposable forEach(Consumer<? super T> onNext) {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable forEachWhile(Predicate<? super T> onNext) {
return forEachWhile(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
return forEachWhile(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand Down Expand Up @@ -10536,7 +10544,9 @@ public final Observable<T> startWithArray(T... items) {
/**
* Subscribes to an ObservableSource and ignores {@code onNext} and {@code onComplete} emissions.
* <p>
* If the Observable emits an error, it is routed to the RxJavaPlugins.onError handler.
* If the Observable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -10548,13 +10558,15 @@ public final Observable<T> startWithArray(T... items) {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

/**
* Subscribes to an ObservableSource and provides a callback to handle the items it emits.
* <p>
* If the Observable emits an error, it is routed to the RxJavaPlugins.onError handler.
* If the Observable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -10571,7 +10583,7 @@ public final Disposable subscribe() {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

/**
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2553,6 +2553,10 @@ public final Single<T> retryWhen(Function<? super Flowable<Throwable>, ? extends

/**
* Subscribes to a Single but ignore its emission or notification.
* <p>
* If the Single emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -2563,7 +2567,7 @@ public final Single<T> retryWhen(Function<? super Flowable<Throwable>, ? extends
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER);
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING);
}

/**
Expand Down Expand Up @@ -2594,6 +2598,10 @@ public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable>

/**
* Subscribes to a Single and provides a callback to handle the item it emits.
* <p>
* If the Single emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -2609,7 +2617,7 @@ public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable>
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onSuccess) {
return subscribe(onSuccess, Functions.ERROR_CONSUMER);
return subscribe(onSuccess, Functions.ON_ERROR_MISSING);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.exceptions;

import io.reactivex.annotations.Experimental;

/**
* Represents an exception used to signal to the {@code RxJavaPlugins.onError()} that a
* callback-based subscribe() method on a base reactive type didn't specify
* an onError handler.
* @since 2.0.6 - experimental
*/
@Experimental
public final class OnErrorNotImplementedException extends RuntimeException {

private static final long serialVersionUID = -6298857009889503852L;

/**
* Customizes the {@code Throwable} with a custom message and wraps it before it
* is signalled to the {@code RxJavaPlugins.onError()} handler as {@code OnErrorNotImplementedException}.
*
* @param message
* the message to assign to the {@code Throwable} to signal
* @param e
* the {@code Throwable} to signal; if null, a NullPointerException is constructed
*/
public OnErrorNotImplementedException(String message, Throwable e) {
super(message, e != null ? e : new NullPointerException());
}

/**
* Wraps the {@code Throwable} before it
* is signalled to the {@code RxJavaPlugins.onError()}
* handler as {@code OnErrorNotImplementedException}.
*
* @param e
* the {@code Throwable} to signal; if null, a NullPointerException is constructed
*/
public OnErrorNotImplementedException(Throwable e) {
super(e != null ? e.getMessage() : null, e != null ? e : new NullPointerException());
}
}
12 changes: 12 additions & 0 deletions src/main/java/io/reactivex/internal/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.reactivestreams.Subscription;

import io.reactivex.*;
import io.reactivex.exceptions.OnErrorNotImplementedException;
import io.reactivex.functions.*;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Timed;
Expand Down Expand Up @@ -218,6 +219,17 @@ public void accept(Throwable error) {
}
};

/**
* Wraps the consumed Throwable into an OnErrorNotImplementedException and
* signals it to the plugin error handler.
*/
public static final Consumer<Throwable> ON_ERROR_MISSING = new Consumer<Throwable>() {
@Override
public void accept(Throwable error) {
RxJavaPlugins.onError(new OnErrorNotImplementedException(error));
}
};

public static final LongConsumer EMPTY_LONG_CONSUMER = new LongConsumer() {
@Override
public void accept(long v) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

import io.reactivex.CompletableObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class CallbackCompletableObserver
Expand All @@ -43,7 +43,7 @@ public CallbackCompletableObserver(Consumer<? super Throwable> onError, Action o

@Override
public void accept(Throwable e) {
RxJavaPlugins.onError(e);
RxJavaPlugins.onError(new OnErrorNotImplementedException(e));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

import io.reactivex.CompletableObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.*;
import io.reactivex.exceptions.OnErrorNotImplementedException;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class EmptyCompletableObserver
Expand Down Expand Up @@ -46,7 +47,7 @@ public void onComplete() {
@Override
public void onError(Throwable e) {
lazySet(DisposableHelper.DISPOSED);
RxJavaPlugins.onError(e);
RxJavaPlugins.onError(new OnErrorNotImplementedException(e));
}

@Override
Expand Down
Loading