Skip to content

Commit

Permalink
Avoid swallowing errors in Completable
Browse files Browse the repository at this point in the history
Instead, deliver them up to the thread's uncaught exception handler.

Fixes #3726
  • Loading branch information
loganj committed Feb 25, 2016
1 parent a57bccc commit 8b55303
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 17 deletions.
17 changes: 14 additions & 3 deletions src/main/java/rx/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1835,6 +1835,7 @@ public void onCompleted() {
public void onError(Throwable e) {
ERROR_HANDLER.handleError(e);
mad.unsubscribe();
deliverUncaughtException(e);
}

@Override
Expand Down Expand Up @@ -1864,14 +1865,17 @@ public void onCompleted() {
onComplete.call();
} catch (Throwable e) {
ERROR_HANDLER.handleError(e);
deliverUncaughtException(e);
} finally {
mad.unsubscribe();
}
mad.unsubscribe();
}

@Override
public void onError(Throwable e) {
ERROR_HANDLER.handleError(e);
mad.unsubscribe();
deliverUncaughtException(e);
}

@Override
Expand Down Expand Up @@ -1915,8 +1919,10 @@ public void onError(Throwable e) {
} catch (Throwable ex) {
e = new CompositeException(Arrays.asList(e, ex));
ERROR_HANDLER.handleError(e);
deliverUncaughtException(e);
} finally {
mad.unsubscribe();
}
mad.unsubscribe();
}

@Override
Expand All @@ -1927,7 +1933,12 @@ public void onSubscribe(Subscription d) {

return mad;
}


private static void deliverUncaughtException(Throwable e) {
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, e);
}

/**
* Subscribes the given CompletableSubscriber to this Completable instance.
* @param s the CompletableSubscriber, not null
Expand Down
16 changes: 16 additions & 0 deletions src/test/java/rx/CapturingUncaughtExceptionHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package rx;

import java.util.concurrent.CountDownLatch;

public final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
public int count = 0;
public Throwable caught;
public CountDownLatch completed = new CountDownLatch(1);

@Override
public void uncaughtException(Thread t, Throwable e) {
count++;
caught = e;
completed.countDown();
}
}
79 changes: 78 additions & 1 deletion src/test/java/rx/CompletableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2700,7 +2700,64 @@ public void call(CompletableSubscriber s) {

Assert.assertTrue(name.get().startsWith("RxComputation"));
}


@Test
public void subscribeEmptyOnError() {
expectUncaughtTestException(new Action0() {
@Override public void call() {
error.completable.subscribe();
}
});
}

@Test
public void subscribeOneActionOnError() {
expectUncaughtTestException(new Action0() {
@Override
public void call() {
error.completable.subscribe(new Action0() {
@Override
public void call() {
}
});
}
});
}

@Test
public void subscribeOneActionThrowFromOnCompleted() {
expectUncaughtTestException(new Action0() {
@Override
public void call() {
normal.completable.subscribe(new Action0() {
@Override
public void call() {
throw new TestException();
}
});
}
});
}

@Test
public void subscribeTwoActionsThrowFromOnError() {
expectUncaughtTestException(new Action0() {
@Override
public void call() {
error.completable.subscribe(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throw new TestException();
}
}, new Action0() {
@Override
public void call() {
}
});
}
});
}

@Test(timeout = 1000)
public void timeoutEmitError() {
Throwable e = Completable.never().timeout(100, TimeUnit.MILLISECONDS).get();
Expand Down Expand Up @@ -3742,4 +3799,24 @@ public void call(Throwable e) {
assertNotNull("Unsubscribed before the call to onError", subscriptionRef.get());
}

private static void expectUncaughtTestException(Action0 action) {
Thread.UncaughtExceptionHandler originalHandler = Thread.getDefaultUncaughtExceptionHandler();
CapturingUncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler();
Thread.setDefaultUncaughtExceptionHandler(handler);
try {
action.call();
assertEquals("Should have received exactly 1 exception", 1, handler.count);
Throwable caught = handler.caught;
while (caught != null) {
if (caught instanceof TestException) break;
if (caught == caught.getCause()) break;
caught = caught.getCause();
}
assertTrue("A TestException should have been delivered to the handler",
caught instanceof TestException);
} finally {
Thread.setDefaultUncaughtExceptionHandler(originalHandler);
}
}

}
14 changes: 1 addition & 13 deletions src/test/java/rx/schedulers/SchedulerTests.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package rx.schedulers;

import rx.CapturingUncaughtExceptionHandler;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
Expand Down Expand Up @@ -87,19 +88,6 @@ static void testHandledErrorIsNotDeliveredToThreadHandler(Scheduler scheduler) t
}
}

private static final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
int count = 0;
Throwable caught;
CountDownLatch completed = new CountDownLatch(1);

@Override
public void uncaughtException(Thread t, Throwable e) {
count++;
caught = e;
completed.countDown();
}
}

private static final class CapturingObserver<T> implements Observer<T> {
CountDownLatch completed = new CountDownLatch(1);
int errorCount = 0;
Expand Down

0 comments on commit 8b55303

Please sign in to comment.