Skip to content

Commit 955bc50

Browse files
committed
First implementation of OperatorWeakBinding
1 parent cbbf514 commit 955bc50

File tree

3 files changed

+99
-0
lines changed

3 files changed

+99
-0
lines changed

rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@
1515
*/
1616
package rx.android.observables;
1717

18+
import static rx.android.schedulers.AndroidSchedulers.mainThread;
19+
1820
import rx.Observable;
1921
import rx.operators.OperatorObserveFromAndroidComponent;
22+
import rx.operators.OperatorWeakBinding;
23+
2024
import android.app.Activity;
2125
import android.app.Fragment;
2226
import android.os.Build;
@@ -58,6 +62,7 @@ private AndroidObservable() {}
5862
* @param <T>
5963
* @return a new observable sequence that will emit notifications on the main UI thread
6064
*/
65+
@Deprecated
6166
public static <T> Observable<T> fromActivity(Activity activity, Observable<T> sourceObservable) {
6267
return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, activity);
6368
}
@@ -86,6 +91,7 @@ public static <T> Observable<T> fromActivity(Activity activity, Observable<T> so
8691
* @param <T>
8792
* @return a new observable sequence that will emit notifications on the main UI thread
8893
*/
94+
@Deprecated
8995
public static <T> Observable<T> fromFragment(Object fragment, Observable<T> sourceObservable) {
9096
if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) {
9197
return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, (android.support.v4.app.Fragment) fragment);
@@ -95,4 +101,21 @@ public static <T> Observable<T> fromFragment(Object fragment, Observable<T> sour
95101
throw new IllegalArgumentException("Target fragment is neither a native nor support library Fragment");
96102
}
97103
}
104+
105+
public static <T> Observable<T> bindActivity(Activity activity, Observable<T> cachedSequence) {
106+
return cachedSequence.observeOn(mainThread()).lift(new OperatorWeakBinding<T, Activity>(activity));
107+
}
108+
109+
public static <T> Observable<T> bindFragment(Object fragment, Observable<T> cachedSequence) {
110+
Observable<T> source = cachedSequence.observeOn(mainThread());
111+
if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) {
112+
android.support.v4.app.Fragment f = (android.support.v4.app.Fragment) fragment;
113+
return source.lift(new OperatorWeakBinding<T, android.support.v4.app.Fragment>(f));
114+
} else if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.HONEYCOMB && fragment instanceof Fragment) {
115+
Fragment f = (Fragment) fragment;
116+
return source.lift(new OperatorWeakBinding<T, Fragment>(f));
117+
} else {
118+
throw new IllegalArgumentException("Target fragment is neither a native nor support library Fragment");
119+
}
120+
}
98121
}

rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorObserveFromAndroidComponent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import android.os.Looper;
2626
import android.util.Log;
2727

28+
@Deprecated
2829
public class OperatorObserveFromAndroidComponent {
2930

3031
public static <T> Observable<T> observeFromAndroidComponent(Observable<T> source, android.app.Fragment fragment) {
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package rx.operators;
2+
3+
import rx.Observable;
4+
import rx.Subscriber;
5+
6+
import android.util.Log;
7+
8+
import java.lang.ref.WeakReference;
9+
10+
public final class OperatorWeakBinding<T, R> implements Observable.Operator<T, T> {
11+
12+
private static final String LOG_TAG = "WeakBinding";
13+
14+
private final WeakReference<R> boundRef;
15+
16+
public OperatorWeakBinding(R bound) {
17+
boundRef = new WeakReference<R>(bound);
18+
}
19+
20+
@Override
21+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
22+
return new WeakSubscriber<T, R>(child, boundRef);
23+
}
24+
25+
private static final class WeakSubscriber<T, R> extends Subscriber<T> {
26+
27+
private final WeakReference<Subscriber<? super T>> subscriberRef;
28+
private final WeakReference<R> boundRef;
29+
30+
private WeakSubscriber(Subscriber<? super T> op, WeakReference<R> boundRef) {
31+
subscriberRef = new WeakReference<Subscriber<? super T>>(op);
32+
this.boundRef = boundRef;
33+
}
34+
35+
@Override
36+
public void onCompleted() {
37+
Subscriber<? super T> sub = subscriberRef.get();
38+
if (sub != null && boundRef.get() != null) {
39+
sub.onCompleted();
40+
} else {
41+
handleLostBinding(sub, "onCompleted");
42+
}
43+
}
44+
45+
@Override
46+
public void onError(Throwable e) {
47+
Subscriber<? super T> sub = subscriberRef.get();
48+
if (sub != null && boundRef.get() != null) {
49+
sub.onError(e);
50+
} else {
51+
handleLostBinding(sub, "onError");
52+
}
53+
}
54+
55+
@Override
56+
public void onNext(T t) {
57+
Subscriber<? super T> sub = subscriberRef.get();
58+
if (sub != null && boundRef.get() != null) {
59+
sub.onNext(t);
60+
} else {
61+
handleLostBinding(sub, "onNext");
62+
}
63+
}
64+
65+
private void handleLostBinding(Subscriber<? super T> sub, String context) {
66+
if (sub == null) {
67+
Log.d(LOG_TAG, "subscriber gone; skipping " + context);
68+
} else {
69+
Log.d(LOG_TAG, "bound component gone; skipping " + context);
70+
}
71+
unsubscribe();
72+
}
73+
74+
}
75+
}

0 commit comments

Comments
 (0)