Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SubscribeOn/ObserveOn Implementation #199

Closed
wants to merge 13 commits into from
71 changes: 45 additions & 26 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,7 @@
import org.mockito.MockitoAnnotations;

import rx.observables.GroupedObservable;
import rx.operators.OperationConcat;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationFilter;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMostRecent;
import rx.operators.OperationNext;
import rx.operators.OperationOnErrorResumeNextViaFunction;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationScan;
import rx.operators.OperationSkip;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTake;
import rx.operators.OperationTakeLast;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
import rx.operators.OperationToObservableList;
import rx.operators.OperationToObservableSortedList;
import rx.operators.OperationZip;
import rx.operators.OperatorGroupBy;
import rx.operators.OperatorTakeUntil;
import rx.operators.OperatorToIterator;
import rx.operators.*;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.Subscriptions;
Expand Down Expand Up @@ -766,6 +741,30 @@ public static Observable<Integer> range(int start, int count) {
return from(Range.createWithCount(start, count));
}

/**
* Asynchronously subscribes and unsubscribes observers on the specified scheduler.
*
* @param source the source observable.
* @param scheduler the scheduler to perform subscription and unsubscription actions on.
* @param <T> the type of observable.
* @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
public static <T> Observable<T> subscribeOn(Observable<T> source, Scheduler scheduler) {
return _create(OperationSubscribeOn.subscribeOn(source, scheduler));
}

/**
* Asynchronously notify observers on the specified scheduler.
*
* @param source the source observable.
* @param scheduler the scheduler to notify observers on.
* @param <T> the type of observable.
* @return the source sequence whose observations happen on the specified scheduler.
*/
public static <T> Observable<T> observeOn(Observable<T> source, Scheduler scheduler) {
return _create(OperationObserveOn.observeOn(source, scheduler));
}

/**
* Returns an observable sequence that invokes the observable factory whenever a new observer subscribes.
* The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer
Expand Down Expand Up @@ -2589,6 +2588,26 @@ public Observable<Notification<T>> materialize() {
return materialize(this);
}

/**
* Asynchronously subscribes and unsubscribes observers on the specified scheduler.
*
* @param scheduler the scheduler to perform subscription and unsubscription actions on.
* @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
public Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(this, scheduler);
}

/**
* Asynchronously notify observers on the specified scheduler.
*
* @param scheduler the scheduler to notify observers on.
* @return the source sequence whose observations happen on the specified scheduler.
*/
public Observable<T> observeOn(Scheduler scheduler) {
return observeOn(this, scheduler);
}

/**
* Dematerializes the explicit notification values of an observable sequence as implicit notifications.
*
Expand Down
65 changes: 65 additions & 0 deletions rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;

import rx.util.functions.Action0;
import rx.util.functions.Func0;

import java.util.concurrent.TimeUnit;

/**
* Represents an object that schedules units of work.
*/
public interface Scheduler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

I think I like having this at rx.Scheduler instead of rx.concurrency.Scheduler as Rx.Net did it.

Does anyone have a reason for us not to put it here as a peer to the other top-level objects in rx.*?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need some additional methods to support the Interval operation #55 .


/**
* Schedules a cancelable action to be executed.
*
* @param action action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Func0<Subscription> action);

/**
* Schedules an action to be executed.
*
* @param action action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Action0 action);

/**
* Schedules an action to be executed in dueTime.
*
* @param action action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Action0 action, long dueTime, TimeUnit unit);

/**
* Schedules a cancelable action to be executed in dueTime.
*
* @param action action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit);

/**
* Returns the scheduler's notion of current time.
*/
long now();

}
53 changes: 53 additions & 0 deletions rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.concurrency;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;

import java.util.concurrent.TimeUnit;

public abstract class AbstractScheduler implements Scheduler {

@Override
public Subscription schedule(Action0 action) {
return schedule(asFunc0(action));
}

@Override
public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
return schedule(asFunc0(action), dueTime, unit);
}

@Override
public long now() {
return System.nanoTime();
}

private static Func0<Subscription> asFunc0(final Action0 action) {
return new Func0<Subscription>() {
@Override
public Subscription call() {
action.call();
return Subscriptions.empty();
}
};
}

}
142 changes: 142 additions & 0 deletions rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.concurrency;

import org.junit.Test;
import org.mockito.InOrder;
import rx.Subscription;
import rx.util.functions.Action0;
import rx.util.functions.Func0;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.*;

public class CurrentThreadScheduler extends AbstractScheduler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This took me a bit to grok but I think I got it and I believe it matches how Rx describes it:

The CurrentThreadScheduler (by accessing the static CurrentThread property) will schedule actions to be performed on the thread that makes the original call. The action is not executed immediately, but is placed in a queue and only executed after the current action is complete.

http://msdn.microsoft.com/en-us/library/hh242963(v=vs.103).aspx

This example also helped me: http://www.introtorx.com/content/v1.0.10621.0/15_SchedulingAndThreading.html#Current

It looks like you might have gotten the unit test from there ... so I think this behaves correctly if I'm understanding it right.

From what I can tell this scheduler is useful when doing nested calls (like the repeat operator) to be a "trampoline" and allow recursion without overflowing the stack ... correct?

private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler();

public static CurrentThreadScheduler getInstance() {
return INSTANCE;
}

private static final ThreadLocal<Queue<DiscardableAction>> QUEUE = new ThreadLocal<Queue<DiscardableAction>>();

private CurrentThreadScheduler() {
}

@Override
public Subscription schedule(Func0<Subscription> action) {
DiscardableAction discardableAction = new DiscardableAction(action);
enqueue(discardableAction);
return discardableAction;
}

@Override
public Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit) {
return schedule(new SleepingAction(action, this, dueTime, unit));
}

private void enqueue(DiscardableAction action) {
Queue<DiscardableAction> queue = QUEUE.get();
boolean exec = queue == null;

if (exec) {
queue = new LinkedList<DiscardableAction>();
QUEUE.set(queue);
}

queue.add(action);

if (exec) {
while (!queue.isEmpty()) {
queue.poll().call();
}

QUEUE.set(null);
}
}

public static class UnitTest {

@Test
public void testNestedActions() {
final CurrentThreadScheduler scheduler = new CurrentThreadScheduler();

final Action0 firstStepStart = mock(Action0.class);
final Action0 firstStepEnd = mock(Action0.class);

final Action0 secondStepStart = mock(Action0.class);
final Action0 secondStepEnd = mock(Action0.class);

final Action0 thirdStepStart = mock(Action0.class);
final Action0 thirdStepEnd = mock(Action0.class);

final Action0 firstAction = new Action0() {
@Override
public void call() {
firstStepStart.call();
firstStepEnd.call();
}
};
final Action0 secondAction = new Action0() {
@Override
public void call() {
secondStepStart.call();
scheduler.schedule(firstAction);
secondStepEnd.call();

}
};
final Action0 thirdAction = new Action0() {
@Override
public void call() {
thirdStepStart.call();
scheduler.schedule(secondAction);
thirdStepEnd.call();
}
};

InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd);

scheduler.schedule(thirdAction);

inOrder.verify(thirdStepStart, times(1)).call();
inOrder.verify(thirdStepEnd, times(1)).call();
inOrder.verify(secondStepStart, times(1)).call();
inOrder.verify(secondStepEnd, times(1)).call();
inOrder.verify(firstStepStart, times(1)).call();
inOrder.verify(firstStepEnd, times(1)).call();
}

@Test
public void testSequenceOfActions() {
final CurrentThreadScheduler scheduler = new CurrentThreadScheduler();

final Action0 first = mock(Action0.class);
final Action0 second = mock(Action0.class);

scheduler.schedule(first);
scheduler.schedule(second);

verify(first, times(1)).call();
verify(second, times(1)).call();

}

}
}
Loading