-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SubscribeOn/ObserveOn Implementation #199
Changes from all commits
dfc7841
0aa6ca2
1896da3
9eb111e
86a750c
df09fcb
2d1c45d
db9f9a6
eaa0316
81ee35d
bd32659
b24b42f
9cfb294
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/** | ||
* Copyright 2013 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package rx; | ||
|
||
import rx.util.functions.Action0; | ||
import rx.util.functions.Func0; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* Represents an object that schedules units of work. | ||
*/ | ||
public interface Scheduler { | ||
|
||
/** | ||
* Schedules a cancelable action to be executed. | ||
* | ||
* @param action action | ||
* @return a subscription to be able to unsubscribe from action. | ||
*/ | ||
Subscription schedule(Func0<Subscription> action); | ||
|
||
/** | ||
* Schedules an action to be executed. | ||
* | ||
* @param action action | ||
* @return a subscription to be able to unsubscribe from action. | ||
*/ | ||
Subscription schedule(Action0 action); | ||
|
||
/** | ||
* Schedules an action to be executed in dueTime. | ||
* | ||
* @param action action | ||
* @return a subscription to be able to unsubscribe from action. | ||
*/ | ||
Subscription schedule(Action0 action, long dueTime, TimeUnit unit); | ||
|
||
/** | ||
* Schedules a cancelable action to be executed in dueTime. | ||
* | ||
* @param action action | ||
* @return a subscription to be able to unsubscribe from action. | ||
*/ | ||
Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit); | ||
|
||
/** | ||
* Returns the scheduler's notion of current time. | ||
*/ | ||
long now(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/** | ||
* Copyright 2013 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package rx.concurrency; | ||
|
||
import rx.Scheduler; | ||
import rx.Subscription; | ||
import rx.subscriptions.Subscriptions; | ||
import rx.util.functions.Action0; | ||
import rx.util.functions.Func0; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
|
||
public abstract class AbstractScheduler implements Scheduler { | ||
|
||
@Override | ||
public Subscription schedule(Action0 action) { | ||
return schedule(asFunc0(action)); | ||
} | ||
|
||
@Override | ||
public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { | ||
return schedule(asFunc0(action), dueTime, unit); | ||
} | ||
|
||
@Override | ||
public long now() { | ||
return System.nanoTime(); | ||
} | ||
|
||
private static Func0<Subscription> asFunc0(final Action0 action) { | ||
return new Func0<Subscription>() { | ||
@Override | ||
public Subscription call() { | ||
action.call(); | ||
return Subscriptions.empty(); | ||
} | ||
}; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
/** | ||
* Copyright 2013 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package rx.concurrency; | ||
|
||
import org.junit.Test; | ||
import org.mockito.InOrder; | ||
import rx.Subscription; | ||
import rx.util.functions.Action0; | ||
import rx.util.functions.Func0; | ||
|
||
import java.util.LinkedList; | ||
import java.util.Queue; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.mockito.Mockito.*; | ||
|
||
public class CurrentThreadScheduler extends AbstractScheduler { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This took me a bit to grok but I think I got it and I believe it matches how Rx describes it:
http://msdn.microsoft.com/en-us/library/hh242963(v=vs.103).aspx This example also helped me: http://www.introtorx.com/content/v1.0.10621.0/15_SchedulingAndThreading.html#Current It looks like you might have gotten the unit test from there ... so I think this behaves correctly if I'm understanding it right. From what I can tell this scheduler is useful when doing nested calls (like the |
||
private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler(); | ||
|
||
public static CurrentThreadScheduler getInstance() { | ||
return INSTANCE; | ||
} | ||
|
||
private static final ThreadLocal<Queue<DiscardableAction>> QUEUE = new ThreadLocal<Queue<DiscardableAction>>(); | ||
|
||
private CurrentThreadScheduler() { | ||
} | ||
|
||
@Override | ||
public Subscription schedule(Func0<Subscription> action) { | ||
DiscardableAction discardableAction = new DiscardableAction(action); | ||
enqueue(discardableAction); | ||
return discardableAction; | ||
} | ||
|
||
@Override | ||
public Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit) { | ||
return schedule(new SleepingAction(action, this, dueTime, unit)); | ||
} | ||
|
||
private void enqueue(DiscardableAction action) { | ||
Queue<DiscardableAction> queue = QUEUE.get(); | ||
boolean exec = queue == null; | ||
|
||
if (exec) { | ||
queue = new LinkedList<DiscardableAction>(); | ||
QUEUE.set(queue); | ||
} | ||
|
||
queue.add(action); | ||
|
||
if (exec) { | ||
while (!queue.isEmpty()) { | ||
queue.poll().call(); | ||
} | ||
|
||
QUEUE.set(null); | ||
} | ||
} | ||
|
||
public static class UnitTest { | ||
|
||
@Test | ||
public void testNestedActions() { | ||
final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); | ||
|
||
final Action0 firstStepStart = mock(Action0.class); | ||
final Action0 firstStepEnd = mock(Action0.class); | ||
|
||
final Action0 secondStepStart = mock(Action0.class); | ||
final Action0 secondStepEnd = mock(Action0.class); | ||
|
||
final Action0 thirdStepStart = mock(Action0.class); | ||
final Action0 thirdStepEnd = mock(Action0.class); | ||
|
||
final Action0 firstAction = new Action0() { | ||
@Override | ||
public void call() { | ||
firstStepStart.call(); | ||
firstStepEnd.call(); | ||
} | ||
}; | ||
final Action0 secondAction = new Action0() { | ||
@Override | ||
public void call() { | ||
secondStepStart.call(); | ||
scheduler.schedule(firstAction); | ||
secondStepEnd.call(); | ||
|
||
} | ||
}; | ||
final Action0 thirdAction = new Action0() { | ||
@Override | ||
public void call() { | ||
thirdStepStart.call(); | ||
scheduler.schedule(secondAction); | ||
thirdStepEnd.call(); | ||
} | ||
}; | ||
|
||
InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); | ||
|
||
scheduler.schedule(thirdAction); | ||
|
||
inOrder.verify(thirdStepStart, times(1)).call(); | ||
inOrder.verify(thirdStepEnd, times(1)).call(); | ||
inOrder.verify(secondStepStart, times(1)).call(); | ||
inOrder.verify(secondStepEnd, times(1)).call(); | ||
inOrder.verify(firstStepStart, times(1)).call(); | ||
inOrder.verify(firstStepEnd, times(1)).call(); | ||
} | ||
|
||
@Test | ||
public void testSequenceOfActions() { | ||
final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); | ||
|
||
final Action0 first = mock(Action0.class); | ||
final Action0 second = mock(Action0.class); | ||
|
||
scheduler.schedule(first); | ||
scheduler.schedule(second); | ||
|
||
verify(first, times(1)).call(); | ||
verify(second, times(1)).call(); | ||
|
||
} | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
I think I like having this at
rx.Scheduler
instead ofrx.concurrency.Scheduler
as Rx.Net did it.Does anyone have a reason for us not to put it here as a peer to the other top-level objects in rx.*?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need some additional methods to support the
Interval
operation #55 .