From 939b5ce0bb17dfd1660a5750abe46b9be473d8eb Mon Sep 17 00:00:00 2001 From: hqzxzwb Date: Thu, 26 Aug 2021 18:09:37 +0800 Subject: [PATCH] Align hasCustomOnError behavior of CallbackCompletableObserver with LambdaObserver, ConsumerSingleObserver and so on (#7326) Co-authored-by: zhuwenbo --- .../io/reactivex/rxjava3/core/Completable.java | 6 +----- .../observers/CallbackCompletableObserver.java | 15 +++------------ .../CallbackCompletableObserverTest.java | 2 +- 3 files changed, 5 insertions(+), 18 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java index 2f424fd405..910c7a2fc9 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Completable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java @@ -2996,11 +2996,7 @@ public final Disposable subscribe( @NonNull @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(@NonNull Action onComplete) { - Objects.requireNonNull(onComplete, "onComplete is null"); - - CallbackCompletableObserver observer = new CallbackCompletableObserver(onComplete); - subscribe(observer); - return observer; + return subscribe(onComplete, Functions.ON_ERROR_MISSING); } /** diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/CallbackCompletableObserver.java b/src/main/java/io/reactivex/rxjava3/internal/observers/CallbackCompletableObserver.java index f0e0610ead..be2da69160 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/observers/CallbackCompletableObserver.java +++ b/src/main/java/io/reactivex/rxjava3/internal/observers/CallbackCompletableObserver.java @@ -20,33 +20,24 @@ import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; +import io.reactivex.rxjava3.internal.functions.Functions; import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection; import io.reactivex.rxjava3.plugins.RxJavaPlugins; public final class CallbackCompletableObserver extends AtomicReference - implements CompletableObserver, Disposable, Consumer, LambdaConsumerIntrospection { + implements CompletableObserver, Disposable, LambdaConsumerIntrospection { private static final long serialVersionUID = -4361286194466301354L; final Consumer onError; final Action onComplete; - public CallbackCompletableObserver(Action onComplete) { - this.onError = this; - this.onComplete = onComplete; - } - public CallbackCompletableObserver(Consumer onError, Action onComplete) { this.onError = onError; this.onComplete = onComplete; } - @Override - public void accept(Throwable e) { - RxJavaPlugins.onError(new OnErrorNotImplementedException(e)); - } - @Override public void onComplete() { try { @@ -86,6 +77,6 @@ public boolean isDisposed() { @Override public boolean hasCustomOnError() { - return onError != this; + return onError != Functions.ON_ERROR_MISSING; } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/observers/CallbackCompletableObserverTest.java b/src/test/java/io/reactivex/rxjava3/internal/observers/CallbackCompletableObserverTest.java index ee95b5a13e..46c4cc4463 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/observers/CallbackCompletableObserverTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/observers/CallbackCompletableObserverTest.java @@ -24,7 +24,7 @@ public final class CallbackCompletableObserverTest extends RxJavaTest { @Test public void emptyActionShouldReportNoCustomOnError() { - CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.EMPTY_ACTION); + CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION); assertFalse(o.hasCustomOnError()); }