Skip to content

Commit

Permalink
Merge pull request #3733 from loganj/completable-errors
Browse files Browse the repository at this point in the history
Avoid swallowing errors in Completable
  • Loading branch information
stevegury committed Mar 2, 2016
2 parents 02e6903 + 8b55303 commit 423172f
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 17 deletions.
17 changes: 14 additions & 3 deletions src/main/java/rx/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1835,6 +1835,7 @@ public void onCompleted() {
public void onError(Throwable e) {
ERROR_HANDLER.handleError(e);
mad.unsubscribe();
deliverUncaughtException(e);
}

@Override
Expand Down Expand Up @@ -1864,14 +1865,17 @@ public void onCompleted() {
onComplete.call();
} catch (Throwable e) {
ERROR_HANDLER.handleError(e);
deliverUncaughtException(e);
} finally {
mad.unsubscribe();
}
mad.unsubscribe();
}

@Override
public void onError(Throwable e) {
ERROR_HANDLER.handleError(e);
mad.unsubscribe();
deliverUncaughtException(e);
}

@Override
Expand Down Expand Up @@ -1915,8 +1919,10 @@ public void onError(Throwable e) {
} catch (Throwable ex) {
e = new CompositeException(Arrays.asList(e, ex));
ERROR_HANDLER.handleError(e);
deliverUncaughtException(e);
} finally {
mad.unsubscribe();
}
mad.unsubscribe();
}

@Override
Expand All @@ -1927,7 +1933,12 @@ public void onSubscribe(Subscription d) {

return mad;
}


private static void deliverUncaughtException(Throwable e) {
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, e);
}

/**
* Subscribes the given CompletableSubscriber to this Completable instance.
* @param s the CompletableSubscriber, not null
Expand Down
16 changes: 16 additions & 0 deletions src/test/java/rx/CapturingUncaughtExceptionHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package rx;

import java.util.concurrent.CountDownLatch;

public final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
public int count = 0;
public Throwable caught;
public CountDownLatch completed = new CountDownLatch(1);

@Override
public void uncaughtException(Thread t, Throwable e) {
count++;
caught = e;
completed.countDown();
}
}
79 changes: 78 additions & 1 deletion src/test/java/rx/CompletableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2700,7 +2700,64 @@ public void call(CompletableSubscriber s) {

Assert.assertTrue(name.get().startsWith("RxComputation"));
}


@Test
public void subscribeEmptyOnError() {
expectUncaughtTestException(new Action0() {
@Override public void call() {
error.completable.subscribe();
}
});
}

@Test
public void subscribeOneActionOnError() {
expectUncaughtTestException(new Action0() {
@Override
public void call() {
error.completable.subscribe(new Action0() {
@Override
public void call() {
}
});
}
});
}

@Test
public void subscribeOneActionThrowFromOnCompleted() {
expectUncaughtTestException(new Action0() {
@Override
public void call() {
normal.completable.subscribe(new Action0() {
@Override
public void call() {
throw new TestException();
}
});
}
});
}

@Test
public void subscribeTwoActionsThrowFromOnError() {
expectUncaughtTestException(new Action0() {
@Override
public void call() {
error.completable.subscribe(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throw new TestException();
}
}, new Action0() {
@Override
public void call() {
}
});
}
});
}

@Test(timeout = 1000)
public void timeoutEmitError() {
Throwable e = Completable.never().timeout(100, TimeUnit.MILLISECONDS).get();
Expand Down Expand Up @@ -3742,4 +3799,24 @@ public void call(Throwable e) {
assertNotNull("Unsubscribed before the call to onError", subscriptionRef.get());
}

private static void expectUncaughtTestException(Action0 action) {
Thread.UncaughtExceptionHandler originalHandler = Thread.getDefaultUncaughtExceptionHandler();
CapturingUncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler();
Thread.setDefaultUncaughtExceptionHandler(handler);
try {
action.call();
assertEquals("Should have received exactly 1 exception", 1, handler.count);
Throwable caught = handler.caught;
while (caught != null) {
if (caught instanceof TestException) break;
if (caught == caught.getCause()) break;
caught = caught.getCause();
}
assertTrue("A TestException should have been delivered to the handler",
caught instanceof TestException);
} finally {
Thread.setDefaultUncaughtExceptionHandler(originalHandler);
}
}

}
14 changes: 1 addition & 13 deletions src/test/java/rx/schedulers/SchedulerTests.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package rx.schedulers;

import rx.CapturingUncaughtExceptionHandler;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
Expand Down Expand Up @@ -87,19 +88,6 @@ static void testHandledErrorIsNotDeliveredToThreadHandler(Scheduler scheduler) t
}
}

private static final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
int count = 0;
Throwable caught;
CountDownLatch completed = new CountDownLatch(1);

@Override
public void uncaughtException(Thread t, Throwable e) {
count++;
caught = e;
completed.countDown();
}
}

private static final class CapturingObserver<T> implements Observer<T> {
CountDownLatch completed = new CountDownLatch(1);
int errorCount = 0;
Expand Down

0 comments on commit 423172f

Please sign in to comment.