Skip to content

Scheduler

David Gross edited this page May 20, 2014 · 38 revisions

If you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators (or particular Observables) to operate on particular Schedulers.

You can make an Observable act on a particular Scheduler by means of the observeOn and subscribeOn operators. You can also split an operator that works on an Observable onto multiple threads with the parallel operator.

Many of the RxJava Observable operators have varieties that take a Scheduler as a parameter. These instruct the operator to do some or all of its work on a particular Scheduler.

See also:

Varieties of Scheduler

You obtain a Scheduler from the factory methods described in the Schedulers class. The following table shows the varieties of Scheduler that are available to you by means of these methods:

Scheduler purpose
Schedulers.computation( ) meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead)
Schedulers.trampoline( ) queues work to begin on the current thread after any already-queued work
Schedulers.immediate( ) schedules work to begin immediately in the current thread
Schedulers.io( ) meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( )
Schedulers.newThread( ) creates a new thread for each unit of work
Schedulers.executor( ) TBD
## Default Schedulers for RxJava Observable operators

Some Observable operators in RxJava have alternate forms that allow you to set which Scheduler the operator will use for (at least some part of) its operation. For these operators, if you do not set the Scheduler, the operator will use the default computation Scheduler.

Other operators do not have a form that permits you to set their Schedulers. Some of these, like startWith, empty, error, from, just, merge, and range do not use a Scheduler. A few others use particular schedulers, as in the following table:

operator Scheduler
parallelMerge currentThread
repeat currentThread
timeInterval immediate
timestamp immediate

Using Schedulers

Aside from passing these Schedulers in to RxJava Observable operators, you can also use them to schedule your own work on Subscriptions. The following example uses the schedule( ) method of the Scheduler class to schedule work on the newThread Scheduler:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
    }

});
// some time later...
worker.unsubscribe();

Recursive Schedulers

To schedule recursive calls, you can use schedule( ) and then schedule(this) on the Worker object:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
        // recurse until unsubscribed (schedule will do nothing if unsubscribed)
        worker.schedule(this);
    }

});
// some time later...
worker.unsubscribe();

Checking or Setting Unsubscribed Status

Objects of the Worker class implement the Subscription interface, with its isUnsubscribed( ) and unsubscribe( ) methods, so you can stop work when a subscription is cancelled, or you can cancel the subscription from within the scheduled task:

Worker worker = Schedulers.newThread.createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

    @Override
    public void call() {
        while(!worker.isUnsubscribed()) {
            status = yourWork();
            if(QUIT == status) { worker.unsubscribe(); }
        }
    }

});

The Worker is also a Subscription and so you can (and should, eventually) call its unsubscribe( ) method to signal that it can halt work and release resources:

worker.unsubscribe();

Delayed and Periodic Schedulers

You can also use a version of schedule( ) that delays your action on the given Scheduler until a certain timespan has passed. The following example schedules someAction to be performed on someScheduler after 500ms have passed according to that Scheduler's clock:

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);

Another Scheduler method allows you to schedule an action to take place at regular intervals. The following example schedules someAction to be performed on someScheduler after 500ms have passed, and then every 250ms thereafter:

someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
Clone this wiki locally