Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove subscribe(Map<String, Object>) and cleanup Functions.from #3

Merged
merged 2 commits into from
Aug 29, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 6 additions & 55 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import rx.util.OnErrorNotImplementedException;
import rx.util.Range;
import rx.util.Timestamped;
import rx.util.functions.Action;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
Expand Down Expand Up @@ -249,56 +250,6 @@ private Subscription protectivelyWrapAndSubscribe(Observer<T> o) {
return subscription.wrap(subscribe(new SafeObserver<T>(subscription, o)));
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Map<String, Object> callbacks) {
if (callbacks == null) {
throw new RuntimeException("callbacks map can not be null");
}
Object _onNext = callbacks.get("onNext");
if (_onNext == null) {
throw new RuntimeException("'onNext' key must contain an implementation");
}
// lookup and memoize onNext
final FuncN onNext = Functions.from(_onNext);

/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {

@Override
public void onCompleted() {
Object onComplete = callbacks.get("onCompleted");
if (onComplete != null) {
Functions.from(onComplete).call();
}
}

@Override
public void onError(Throwable e) {
handleError(e);
Object onError = callbacks.get("onError");
if (onError != null) {
Functions.from(onError).call(e);
} else {
throw new OnErrorNotImplementedException(e);
}
}

@Override
public void onNext(Object args) {
onNext.call(args);
}

});
}

public Subscription subscribe(final Map<String, Object> callbacks, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(callbacks);
}

public Subscription subscribe(final Action1<T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
Expand Down Expand Up @@ -1086,13 +1037,13 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observabl
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png">
*
*
* @param w0
* The first source observable.
* The first source observable.
* @param w1
* The second source observable.
* The second source observable.
* @param combineFunction
* The aggregation function used to combine the source observable values.
* The aggregation function used to combine the source observable values.
* @return An Observable that combines the source Observables with the given combine function
*/
public static <R, T0, T1> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineFunction) {
Expand All @@ -1112,7 +1063,7 @@ public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<T0> w0, Obs
public static <R, T0, T1, T2, T3> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<T0, T1, T2, T3, R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction));
}

/**
* Creates an Observable which produces buffers of collected values.
*
Expand Down
18 changes: 1 addition & 17 deletions rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import rx.Subscription;
import rx.util.functions.Action0;
import rx.util.functions.FuncN;
import rx.util.functions.Function;
import rx.util.functions.Functions;

/**
Expand Down Expand Up @@ -83,23 +84,6 @@ public static CompositeSubscription create(Subscription... subscriptions) {
return new CompositeSubscription(subscriptions);
}

/**
* A {@link Subscription} implemented via an anonymous function (such as closures from other languages).
*
* @return {@link Subscription}
*/
public static Subscription create(final Object unsubscribe) {
final FuncN<?> f = Functions.from(unsubscribe);
return new Subscription() {

@Override
public void unsubscribe() {
f.call();
}

};
}

/**
* A {@link Subscription} that does nothing when its unsubscribe method is called.
*/
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/functions/Action.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
* <p>
* Marker interface to allow instanceof checks.
*/
public interface Action {
public interface Action extends Function {

}
13 changes: 2 additions & 11 deletions rxjava-core/src/main/java/rx/util/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package rx.util.functions;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;

public class Functions {

/**
Expand All @@ -26,17 +23,11 @@ public class Functions {
* @param function
*/
@SuppressWarnings({ "rawtypes" })
public static FuncN from(final Object function) {
public static FuncN from(final Function function) {
if (function == null) {
throw new RuntimeException("function is null. Can't send arguments to null function.");
}

/* check for typed Rx Function implementation first */
if (function instanceof Function) {
return fromFunction((Function) function);
}
// no support found
throw new RuntimeException("Unsupported closure type: " + function.getClass().getSimpleName());
return fromFunction(function);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down