Skip to content

1.x: enable operator/source fusion by named operator lifter #3506

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 26 additions & 30 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,36 +145,14 @@ public void call(Subscriber<? super T> subscriber) {
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the value type after the transformation by the operator
* @param operator the Operator that implements the Observable-operating function to be applied to the source
* Observable
* @return an Observable that is the result of applying the lifted Operator to the source Observable
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
*/
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
return create(new OnSubscribeLift<T, R>(this.onSubscribe, operator));
}

/**
Expand Down Expand Up @@ -5813,7 +5791,25 @@ public final Observable<Notification<T>> materialize() {
* @return an Observable that emits all of the items emitted by the source Observables
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@SuppressWarnings("unchecked")
public final Observable<T> mergeWith(Observable<? extends T> t1) {
if (this.onSubscribe instanceof OnSubscribeLift) {
OnSubscribeLift lifted = (OnSubscribeLift) this.onSubscribe;
if ((lifted.operator() instanceof OperatorMerge) && (lifted.source() instanceof OnSubscribeFromIterable)) {
OnSubscribeFromIterable<Observable<? extends T>> iter = (OnSubscribeFromIterable<Observable<? extends T>>)lifted.source();
Iterable<? extends Observable<? extends T>> it = iter.iterable();
if (it instanceof List) {
List<? extends Observable<? extends T>> lit = (List<? extends Observable<? extends T>>) it;
List<Observable<? extends T>> newList = new ArrayList<Observable<? extends T>>(lit.size() + 1);
for (Observable<? extends T> t : lit) {
newList.add(t);
}
newList.add(t1);

return merge(from(newList));
}
}
}
return merge(this, t1);
}

Expand Down Expand Up @@ -8167,7 +8163,7 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
// allow the HOOK to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
Expand All @@ -8181,9 +8177,9 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
// TODO could the HOOK be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
// TODO why aren't we throwing the HOOK's return value.
throw r;
}
return Subscriptions.unsubscribed();
Expand Down Expand Up @@ -8260,7 +8256,7 @@ private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Obse
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
// allow the HOOK to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
Expand All @@ -8274,9 +8270,9 @@ private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Obse
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
// TODO could the HOOK be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
// TODO why aren't we throwing the HOOK's return value.
throw r;
}
return Subscriptions.unsubscribed();
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private Single(final Observable.OnSubscribe<T> f) {
* @see <a href="http://reactivex.io/documentation/operators/create.html">ReactiveX operators documentation: Create</a>
*/
public final static <T> Single<T> create(OnSubscribe<T> f) {
return new Single<T>(f); // TODO need hook
return new Single<T>(f); // TODO need HOOK
}

/**
Expand Down Expand Up @@ -1492,8 +1492,8 @@ public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// TODO add back the hook
// hook.onSubscribeStart(this, onSubscribe).call(subscriber);
// TODO add back the HOOK
// HOOK.onSubscribeStart(this, onSubscribe).call(subscriber);
onSubscribe.call(subscriber);
hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
Expand All @@ -1507,9 +1507,9 @@ public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
// TODO could the HOOK be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
// TODO why aren't we throwing the HOOK's return value.
throw r;
}
}
Expand Down Expand Up @@ -1578,9 +1578,9 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {

// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
try {
// allow the hook to intercept and/or decorate
// TODO add back the hook
// hook.onSubscribeStart(this, onSubscribe).call(subscriber);
// allow the HOOK to intercept and/or decorate
// TODO add back the HOOK
// HOOK.onSubscribeStart(this, onSubscribe).call(subscriber);
onSubscribe.call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
Expand All @@ -1594,9 +1594,9 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
// TODO could the HOOK be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
// TODO why aren't we throwing the HOOK's return value.
throw r;
}
return Subscriptions.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
* <p>
* You can convert any object that supports the Iterable interface into an Observable that emits each item in
* the object, with the {@code toObservable} operation.
*
* @param <T> the value type
*/
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {

Expand All @@ -39,6 +41,10 @@ public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
}
this.is = iterable;
}

public Iterable<? extends T> iterable() {
return is;
}

@Override
public void call(final Subscriber<? super T> o) {
Expand Down
99 changes: 99 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeLift.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.operators;

import rx.*;
import rx.Observable.*;
import rx.exceptions.Exceptions;
import rx.plugins.*;

/**
* Applies an operator to the incoming child Subscriber and subscribes
* the resulting Subscriber to a source Observable.
* <p>
* By turning the original lift from an anonymous class into a named class,
* operator optimizations can now look at the graph and discover the
* operators and sources.
*
* @param <T> the source value type
* @param <R> the result value type;
*/
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
/** The operator. */
final Operator<? extends R, ? super T> operator;
/** The upstream. */
final OnSubscribe<? extends T> source;
/** The callback hook to transform the operator if necessary. */
static final RxJavaObservableExecutionHook HOOK =
RxJavaPlugins.getInstance().getObservableExecutionHook();

/**
* Constructs an OnSubscribeLift instance with the given source and operators.
* <p>
* The constructor has to take in an OnSubscribe instead of an Observable, unfortunately,
* because the subscribe/unsafeSubscribe activities would interfere (double onStart,
* double wrapping by hooks, etc).
* @param source the source OnSubscribe
* @param operator the operator to apply on the child subscribers to get a Subscriber for source
*/
public OnSubscribeLift(OnSubscribe<? extends T> source, Operator<? extends R, ? super T> operator) {
this.operator = operator;
this.source = source;
}

/**
* Returns the operator instance of this lifting OnSubscribe.
* @return the operator instance of this lifting OnSubscribe
*/
public Operator<? extends R, ? super T> operator() {
return operator;
}

/**
* Returns the source OnSubscribe of this OnSubscribe.
* @return the source OnSubscribe of this OnSubscribe
*/
public OnSubscribe<? extends T> source() {
return source;
}

@Override
public void call(Subscriber<? super R> child) {
try {
Operator<? extends R, ? super T> onLift = HOOK.onLift(operator);

Subscriber<? super T> st = onLift.call(child);

try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
source.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
child.onError(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ExactSubscriber(Subscriber<? super Observable<T>> child) {
*/
this.child = child;
/*
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
* Add unsubscribe HOOK to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
*/
}
void init() {
Expand Down Expand Up @@ -156,7 +156,7 @@ public InexactSubscriber(Subscriber<? super Observable<T>> child) {

void init() {
/*
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
* Add unsubscribe HOOK to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
*/
child.add(Subscriptions.create(new Action0() {

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/rx/plugins/RxJavaSchedulersHook.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
* the 3 methods that return Scheduler (io(), computation(), newThread()).
* 2. You may wrap/decorate an {@link Action0}, before it is handed off to a Scheduler. The system-
* supplied Schedulers (Schedulers.ioScheduler, Schedulers.computationScheduler,
* Scheduler.newThreadScheduler) all use this hook, so it's a convenient way to
* Scheduler.newThreadScheduler) all use this HOOK, so it's a convenient way to
* modify Scheduler functionality without redefining Schedulers wholesale.
*
* Also, when redefining Schedulers, you are free to use/not use the onSchedule decoration hook.
* Also, when redefining Schedulers, you are free to use/not use the onSchedule decoration HOOK.
* <p>
* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins:
* <a href="https://github.com/ReactiveX/RxJava/wiki/Plugins">https://github.com/ReactiveX/RxJava/wiki/Plugins</a>.
Expand Down
50 changes: 50 additions & 0 deletions src/test/java/rx/internal/operators/OperatorMergeWithTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.operators;

import org.junit.Test;

import rx.Observable;
import rx.observers.TestSubscriber;

public class OperatorMergeWithTest {
@Test
public void mergeLargeAmountOfSources() {
Observable<Integer> source = Observable.range(1, 2);

Observable<Integer> result = source;
int n = 5000;

for (int i = 0; i < n; i++) {
result = result.mergeWith(source);
}

TestSubscriber<Integer> ts = TestSubscriber.create();

long t = System.nanoTime();

result.subscribe(ts);

ts.assertValueCount((n + 1) * 2);
ts.assertNoErrors();
ts.assertCompleted();

t = System.nanoTime() - t;

System.out.printf("Merging took: %,d ns%n", t);
}
}