Skip to content

Commit

Permalink
Trying to extend the Scheduler interface according to the comments at
Browse files Browse the repository at this point in the history
  • Loading branch information
jmhofer committed Apr 9, 2013
1 parent 04068c1 commit 0950c46
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 0 deletions.
40 changes: 40 additions & 0 deletions rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,31 @@

import rx.util.functions.Action0;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Represents an object that schedules units of work.
*/
public interface Scheduler {

/**
* Schedules a cancelable action to be executed.
*
* @param state State to pass into the action.
* @param action Action to schedule.
* @return a subscription to be able to unsubscribe from action.
*/
<T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action);

/**
* Schedules a cancelable action to be executed.
*
* @param action Action to schedule.
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Func1<Scheduler, Subscription> action);

/**
* Schedules a cancelable action to be executed.
*
Expand All @@ -43,6 +62,27 @@ public interface Scheduler {
*/
Subscription schedule(Action0 action);

/**
* Schedules a cancelable action to be executed in dueTime.
*
* @param state State to pass into the action.
* @param action Action to schedule.
* @param dueTime Time the action is due for executing.
* @param unit Time unit of the due time.
* @return a subscription to be able to unsubscribe from action.
*/
<T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit);

/**
* Schedules a cancelable action to be executed in dueTime.
*
* @param action Action to schedule.
* @param dueTime Time the action is due for executing.
* @param unit Time unit of the due time.
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Func1<Scheduler, Subscription> action, long dueTime, TimeUnit unit);

/**
* Schedules an action to be executed in dueTime.
*
Expand Down
42 changes: 42 additions & 0 deletions rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/* package */abstract class AbstractScheduler implements Scheduler {

Expand All @@ -30,11 +32,51 @@ public Subscription schedule(Action0 action) {
return schedule(asFunc0(action));
}

@Override
public Subscription schedule(final Func1<Scheduler, Subscription> action) {
return schedule(new Func0<Subscription>() {
@Override
public Subscription call() {
return action.call(AbstractScheduler.this);
}
});
}

@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
return schedule(new Func0<Subscription>() {
@Override
public Subscription call() {
return action.call(AbstractScheduler.this, state);
}
});
}

@Override
public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
return schedule(asFunc0(action), dueTime, unit);
}

@Override
public Subscription schedule(final Func1<Scheduler, Subscription> action, long dueTime, TimeUnit unit) {
return schedule(new Func0<Subscription>() {
@Override
public Subscription call() {
return action.call(AbstractScheduler.this);
}
}, dueTime, unit);
}

@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
return schedule(new Func0<Subscription>() {
@Override
public Subscription call() {
return action.call(AbstractScheduler.this, state);
}
}, dueTime, unit);
}

@Override
public long now() {
return System.nanoTime();
Expand Down
21 changes: 21 additions & 0 deletions rxjava-core/src/main/java/rx/operators/Tester.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import rx.util.functions.Action0;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Common utility functions for testing operator implementations.
Expand Down Expand Up @@ -289,6 +290,16 @@ public Subscription schedule(Func0<Subscription> action) {
return underlying.schedule(action);
}

@Override
public Subscription schedule(Func1<Scheduler, Subscription> action) {
return underlying.schedule(action);
}

@Override
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
return underlying.schedule(state, action);
}

@Override
public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
return underlying.schedule(action, dueTime, unit);
Expand All @@ -299,6 +310,16 @@ public Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit
return underlying.schedule(action, dueTime, unit);
}

@Override
public Subscription schedule(Func1<Scheduler, Subscription> action, long dueTime, TimeUnit unit) {
return underlying.schedule(action, dueTime, unit);
}

@Override
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
return underlying.schedule(state, action, dueTime, unit);
}

@Override
public long now() {
return underlying.now();
Expand Down

0 comments on commit 0950c46

Please sign in to comment.