Skip to content

Commit af05e7a

Browse files
committed
Merge pull request ReactiveX#241 from ReactiveX/jw/mts
Add MainThreadSubscription utility class.
2 parents eb1ecfc + 5378804 commit af05e7a

File tree

2 files changed

+191
-0
lines changed

2 files changed

+191
-0
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package rx.android;
15+
16+
import android.os.Looper;
17+
import java.util.concurrent.atomic.AtomicBoolean;
18+
import rx.Subscription;
19+
import rx.android.schedulers.AndroidSchedulers;
20+
import rx.functions.Action0;
21+
22+
/**
23+
* A {@linkplain Subscription subscription} which ensures its {@linkplain #onUnsubscribe()
24+
* unsubscribe action} is executed on the main thread. When unsubscription occurs on a different
25+
* thread than the main thread, the action is posted to run on the main thread as soon as possible.
26+
* <p>
27+
* Instances of this class are useful in creating observables which interact with APIs that can
28+
* only be used on the main thread, such as UI objects.
29+
* <p>
30+
* A {@link #verifyMainThread() convenience method} is also provided for validating whether code
31+
* is being called on the main thread. Calls to this method along with instances of this class are
32+
* commonly used when creating custom observables using the following pattern:
33+
* <pre>{@code
34+
* &#064;Override public void call(Subscriber<? extends T> subscriber) {
35+
* MainThreadSubscription.verifyMainThread();
36+
*
37+
* // TODO set up behavior
38+
*
39+
* subscriber.add(new MainThreadSubscriber() {
40+
* &#064;Override public void onUnsubscribe() {
41+
* // TODO undo behavior
42+
* }
43+
* });
44+
* }
45+
* }</pre>
46+
*/
47+
public abstract class MainThreadSubscription implements Subscription {
48+
/**
49+
* Verify that the calling thread is the Android main thread.
50+
* <p>
51+
* Calls to this method are usually preconditions for subscription behavior which instances of
52+
* this class later undo. See the class documentation for an example.
53+
*
54+
* @throws IllegalStateException when called from any other thread.
55+
*/
56+
public static void verifyMainThread() {
57+
if (Looper.myLooper() != Looper.getMainLooper()) {
58+
throw new IllegalStateException(
59+
"Expected to be called on the main thread but was " + Thread.currentThread().getName());
60+
}
61+
}
62+
63+
private final AtomicBoolean unsubscribed = new AtomicBoolean();
64+
65+
@Override public final boolean isUnsubscribed() {
66+
return unsubscribed.get();
67+
}
68+
69+
@Override public final void unsubscribe() {
70+
if (unsubscribed.compareAndSet(false, true)) {
71+
if (Looper.myLooper() == Looper.getMainLooper()) {
72+
onUnsubscribe();
73+
} else {
74+
AndroidSchedulers.mainThread().createWorker().schedule(new Action0() {
75+
@Override public void call() {
76+
onUnsubscribe();
77+
}
78+
});
79+
}
80+
}
81+
}
82+
83+
protected abstract void onUnsubscribe();
84+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package rx.android;
15+
16+
import java.util.concurrent.CountDownLatch;
17+
import java.util.concurrent.atomic.AtomicBoolean;
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
import org.junit.Test;
20+
import org.junit.runner.RunWith;
21+
import org.robolectric.RobolectricTestRunner;
22+
import org.robolectric.annotation.Config;
23+
import org.robolectric.shadows.ShadowLooper;
24+
25+
import static java.util.concurrent.TimeUnit.SECONDS;
26+
import static org.junit.Assert.assertEquals;
27+
import static org.junit.Assert.assertFalse;
28+
import static org.junit.Assert.assertTrue;
29+
import static org.junit.Assert.fail;
30+
31+
@RunWith(RobolectricTestRunner.class)
32+
@Config(manifest=Config.NONE)
33+
public final class MainThreadSubscriptionTest {
34+
@Test public void verifyDoesNotThrowOnMainThread() throws InterruptedException {
35+
MainThreadSubscription.verifyMainThread();
36+
// Robolectric tests run on its main thread.
37+
}
38+
39+
@Test public void verifyThrowsOffMainThread() throws InterruptedException {
40+
final CountDownLatch latch = new CountDownLatch(1);
41+
new Thread(new Runnable() {
42+
@Override public void run() {
43+
try {
44+
MainThreadSubscription.verifyMainThread();
45+
fail();
46+
} catch (IllegalStateException e) {
47+
assertTrue(e.getMessage().startsWith("Expected to be called on the main thread"));
48+
latch.countDown();
49+
}
50+
}
51+
}).start();
52+
53+
assertTrue(latch.await(1, SECONDS));
54+
}
55+
56+
@Test public void onUnsubscribeRunsSyncOnMainThread() {
57+
ShadowLooper.pauseMainLooper();
58+
59+
final AtomicBoolean called = new AtomicBoolean();
60+
new MainThreadSubscription() {
61+
@Override protected void onUnsubscribe() {
62+
called.set(true);
63+
}
64+
}.unsubscribe();
65+
66+
assertTrue(called.get());
67+
}
68+
69+
@Test public void unsubscribeTwiceDoesNotRunTwice() {
70+
final AtomicInteger called = new AtomicInteger(0);
71+
72+
MainThreadSubscription subscription = new MainThreadSubscription() {
73+
@Override protected void onUnsubscribe() {
74+
called.incrementAndGet();
75+
}
76+
};
77+
78+
subscription.unsubscribe();
79+
subscription.unsubscribe();
80+
subscription.unsubscribe();
81+
82+
assertEquals(1, called.get());
83+
}
84+
85+
@Test public void onUnsubscribePostsOffMainThread() throws InterruptedException {
86+
ShadowLooper.pauseMainLooper();
87+
88+
final CountDownLatch latch = new CountDownLatch(1);
89+
final AtomicBoolean called = new AtomicBoolean();
90+
new Thread(new Runnable() {
91+
@Override public void run() {
92+
new MainThreadSubscription() {
93+
@Override protected void onUnsubscribe() {
94+
called.set(true);
95+
}
96+
}.unsubscribe();
97+
latch.countDown();
98+
}
99+
}).start();
100+
101+
assertTrue(latch.await(1, SECONDS));
102+
assertFalse(called.get()); // Callback has not yet run.
103+
104+
ShadowLooper.runMainLooperOneTask();
105+
assertTrue(called.get());
106+
}
107+
}

0 commit comments

Comments
 (0)