Skip to content

Commit

Permalink
Add doOnSubscribe for Single
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron He committed Mar 3, 2016
1 parent 423172f commit ccd24b5
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
22 changes: 22 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2250,6 +2250,28 @@ public void onNext(T t) {
return lift(new OperatorDoOnEach<T>(observer));
}

/**
* Modifies the source {@code Single} so that it invokes the given action when it is subscribed from
* its subscribers. Each subscription will result in an invocation of the given action except when the
* source {@code Single} is reference counted, in which case the source {@code Single} will invoke
* the given action for the first subscription.
* <p>
* <img width="640" height="390" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnSubscribe.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param subscribe
* the action that gets called when an observer subscribes to this {@code Single}
* @return the source {@code Single} modified so as to call this Action when appropriate
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
@Experimental
public final Single<T> doOnSubscribe(final Action0 subscribe) {
return lift(new OperatorDoOnSubscribe<T>(subscribe));
}

/**
* Returns an Single that emits the items emitted by the source Single shifted forward in time by a
* specified delay. Error notifications from the source Single are not delayed.
Expand Down
37 changes: 37 additions & 0 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,43 @@ public void doOnSuccessShouldNotSwallowExceptionThrownByAction() {
verify(action).call(eq("value"));
}

@Test
public void doOnSubscribeShouldInvokeAction() {
Action0 action = mock(Action0.class);
Single<Integer> single = Single.just(1).doOnSubscribe(action);

verifyZeroInteractions(action);

single.subscribe();
single.subscribe();

verify(action, times(2)).call();
}

@Test
public void doOnSubscribeShouldInvokeActionBeforeSubscriberSubscribes() {
final List<String> callSequence = new ArrayList<String>(2);

Single<Integer> single = Single.create(new OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> singleSubscriber) {
callSequence.add("onSubscribe");
singleSubscriber.onSuccess(1);
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
callSequence.add("doOnSubscribe");
}
});

single.subscribe();

assertEquals(2, callSequence.size());
assertEquals("doOnSubscribe", callSequence.get(0));
assertEquals("onSubscribe", callSequence.get(1));
}

@Test
public void delayWithSchedulerShouldDelayCompletion() {
TestScheduler scheduler = new TestScheduler();
Expand Down

0 comments on commit ccd24b5

Please sign in to comment.