Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,21 @@
*/
package io.reactivex.android.schedulers;

import android.annotation.SuppressLint;
import android.os.Build;
import android.os.Handler;
import android.os.Looper;

import java.util.concurrent.Callable;

import android.os.Message;
import io.reactivex.Scheduler;
import io.reactivex.android.plugins.RxAndroidPlugins;
import java.util.concurrent.Callable;

/** Android-specific Schedulers. */
public final class AndroidSchedulers {

private static final class MainHolder {

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
Expand All @@ -43,8 +44,33 @@ public static Scheduler mainThread() {

/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
return from(looper, false);
}

/**
* A {@link Scheduler} which executes actions on {@code looper}.
*
* @param async if true, the scheduler will use async messaging on API >= 16 to avoid VSYNC
* locking. On API < 16, this will no-op.
* @see Message#setAsynchronous(boolean)
*/
@SuppressLint("NewApi")
public static Scheduler from(Looper looper, boolean async) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
boolean useAsync = async && Build.VERSION.SDK_INT >= 16;
Handler handler = new Handler(looper);
if (useAsync && Build.VERSION.SDK_INT < 22) {
// Confirm the method is there
Message message = Message.obtain(handler);
try {
message.setAsynchronous(true);
} catch (NoSuchMethodError e) {
useAsync = false;
} finally {
message.recycle();
}
}
return new HandlerScheduler(handler, useAsync);
}

private AndroidSchedulers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.reactivex.android.schedulers;

import android.annotation.SuppressLint;
import android.os.Handler;
import android.os.Message;
import io.reactivex.Scheduler;
Expand All @@ -23,9 +24,11 @@

final class HandlerScheduler extends Scheduler {
private final Handler handler;
private final boolean async;

HandlerScheduler(Handler handler) {
HandlerScheduler(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}

@Override
Expand All @@ -41,18 +44,21 @@ public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {

@Override
public Worker createWorker() {
return new HandlerWorker(handler);
return new HandlerWorker(handler, async);
}

private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;

private volatile boolean disposed;

HandlerWorker(Handler handler) {
HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}

@SuppressLint("NewApi") // the async flag is version-checked in its factory
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
Expand All @@ -67,6 +73,9 @@ public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

Message message = Message.obtain(handler, scheduled);
if (async) {
message.setAsynchronous(true);
}
message.obj = this; // Used as token for batch disposal of this worker's runnables.

handler.sendMessageDelayed(message, unit.toMillis(delay));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.plugins.RxJavaPlugins;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.robolectric.RobolectricTestRunner;
import org.robolectric.ParameterizedRobolectricTestRunner;
import org.robolectric.annotation.Config;
import org.robolectric.shadows.ShadowLooper;

Expand All @@ -46,9 +48,24 @@
import static org.robolectric.shadows.ShadowLooper.runUiThreadTasksIncludingDelayedTasks;
import static org.robolectric.shadows.ShadowLooper.unPauseMainLooper;

@RunWith(RobolectricTestRunner.class)
@Config(manifest=Config.NONE)
@RunWith(ParameterizedRobolectricTestRunner.class)
@Config(manifest=Config.NONE, sdk = 16)
public final class HandlerSchedulerTest {

@ParameterizedRobolectricTestRunner.Parameters(name = "async = {0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{true},
{false}
});
}

private Scheduler scheduler;

public HandlerSchedulerTest(boolean async) {
this.scheduler = new HandlerScheduler(new Handler(Looper.getMainLooper()), async);
}

@Before
public void setUp() {
RxJavaPlugins.reset();
Expand All @@ -61,8 +78,6 @@ public void tearDown() {
unPauseMainLooper();
}

private Scheduler scheduler = new HandlerScheduler(new Handler(Looper.getMainLooper()));

@Test
public void directScheduleOncePostsImmediately() {
CountingRunnable counter = new CountingRunnable();
Expand Down