Skip to content

Commit

Permalink
Rx Completable for ApolloPrefetch (#408)
Browse files Browse the repository at this point in the history
  • Loading branch information
VisheshVadhera authored and sav007 committed Apr 13, 2017
1 parent c8deb5a commit 3f279ef
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@
import okhttp3.mockwebserver.MockWebServer;
import rx.Observer;
import rx.Subscription;
import rx.observers.AssertableSubscriber;
import rx.observers.TestSubscriber;

import static com.google.common.truth.Truth.assertThat;

public class RxApolloTest {

private ApolloClient apolloClient;
private MockWebServer server;

private static final int RX_DELAY_SECONDS = 2;
private static final long TIME_OUT_SECONDS = 3;

@Before public void setUp() {
Expand Down Expand Up @@ -88,7 +92,7 @@ public class RxApolloTest {
TestSubscriber<EpisodeHeroName.Data> subscriber = new TestSubscriber<>();
Subscription subscription = RxApollo
.from(call)
.delay(5, TimeUnit.SECONDS)
.delay(RX_DELAY_SECONDS, TimeUnit.SECONDS)
.subscribe(subscriber);
subscription.unsubscribe();
subscriber
Expand All @@ -97,6 +101,36 @@ public class RxApolloTest {

}

@Test public void testRxPrefetchCompletes() throws IOException {
EpisodeHeroName query = EpisodeHeroName.builder().episode(Episode.EMPIRE).build();
server.enqueue(mockResponse("EpisodeHeroNameResponseWithId.json"));

RxApollo
.from(apolloClient.prefetch(query))
.test()
.awaitTerminalEvent()
.assertNoErrors()
.assertCompleted();
}

@Test public void testRxPrefetchIsCanceledWhenUnsubscribed() throws IOException {

EpisodeHeroName query = EpisodeHeroName.builder().episode(Episode.EMPIRE).build();
server.enqueue(mockResponse("EpisodeHeroNameResponseWithId.json"));

ApolloPrefetch prefetch = apolloClient.prefetch(query);

AssertableSubscriber<Void> subscriber = RxApollo
.from(prefetch)
.delay(RX_DELAY_SECONDS, TimeUnit.SECONDS)
.test();

subscriber.unsubscribe();

subscriber.assertUnsubscribed();
subscriber.assertNotCompleted();
}

@Test
public void testRxQueryWatcherUpdated_SameQuery_DifferentResults() throws IOException, InterruptedException,
TimeoutException, ApolloException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@


import com.apollographql.apollo.ApolloCall;
import com.apollographql.apollo.ApolloPrefetch;
import com.apollographql.apollo.ApolloWatcher;
import com.apollographql.apollo.api.Response;
import com.apollographql.apollo.exception.ApolloException;
import com.apollographql.apollo.internal.util.Cancelable;

import javax.annotation.Nonnull;

import rx.Completable;
import rx.CompletableSubscriber;
import rx.Emitter;
import rx.Observable;
import rx.Single;
import rx.SingleSubscriber;
import rx.Subscription;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -103,6 +107,44 @@ public static <T> Observable<T> from(@Nonnull final ApolloWatcher<T> watcher) {
});
}

/**
* Converts an {@link ApolloPrefetch} to a Completable.
*
* @param prefetch the ApolloPrefetch to convert
* @return the converted Completable
*/
@Nonnull public static Completable from(@Nonnull final ApolloPrefetch prefetch) {
checkNotNull(prefetch, "prefetch == null");

return Completable.create(new Completable.OnSubscribe() {
@Override public void call(final CompletableSubscriber subscriber) {
Subscription subscription = getSubscription(subscriber, prefetch);

try {
prefetch.execute();
if (!subscription.isUnsubscribed()) {
subscriber.onCompleted();
}
} catch (ApolloException e) {
Exceptions.throwIfFatal(e);
if (!subscription.isUnsubscribed()) {
subscriber.onError(e);
}
}
}
});
}

private static Subscription getSubscription(CompletableSubscriber subscriber, final Cancelable cancelable) {
Subscription subscription = Subscriptions.create(new Action0() {
@Override public void call() {
cancelable.cancel();
}
});
subscriber.onSubscribe(subscription);
return subscription;
}

private static <T> void cancelOnSingleUnsubscribe(SingleSubscriber<? super T> subscriber, final Cancelable toCancel) {
subscriber.add(Subscriptions.create(new Action0() {
@Override public void call() {
Expand Down

0 comments on commit 3f279ef

Please sign in to comment.