4
4
import android .databinding .ObservableField ;
5
5
import android .support .annotation .NonNull ;
6
6
7
+ import rx .AsyncEmitter ;
7
8
import rx .Observable ;
8
- import rx .Subscriber ;
9
- import rx .functions .Action0 ;
10
- import rx .subscriptions .Subscriptions ;
11
9
12
10
import static android .databinding .Observable .OnPropertyChangedCallback ;
13
11
@@ -17,29 +15,21 @@ private RxUtils() {
17
15
}
18
16
19
17
public static <T > Observable <T > toObservable (@ NonNull final ObservableField <T > observableField ) {
20
- return Observable .create (new Observable .OnSubscribe <T >() {
21
- @ Override
22
- public void call (final Subscriber <? super T > subscriber ) {
23
- subscriber .onNext (observableField .get ());
24
-
25
- final OnPropertyChangedCallback callback = new OnPropertyChangedCallback () {
26
- @ Override
27
- public void onPropertyChanged (android .databinding .Observable dataBindingObservable , int propertyId ) {
28
- if (dataBindingObservable == observableField ) {
29
- subscriber .onNext (observableField .get ());
30
- }
18
+ return Observable .fromEmitter (asyncEmitter -> {
19
+
20
+ final OnPropertyChangedCallback callback = new OnPropertyChangedCallback () {
21
+ @ Override
22
+ public void onPropertyChanged (android .databinding .Observable dataBindingObservable , int propertyId ) {
23
+ if (dataBindingObservable == observableField ) {
24
+ asyncEmitter .onNext (observableField .get ());
31
25
}
32
- };
26
+ }
27
+ };
33
28
34
- observableField .addOnPropertyChangedCallback (callback );
29
+ observableField .addOnPropertyChangedCallback (callback );
35
30
36
- subscriber .add (Subscriptions .create (new Action0 () {
37
- @ Override
38
- public void call () {
39
- observableField .removeOnPropertyChangedCallback (callback );
40
- }
41
- }));
42
- }
43
- });
31
+ asyncEmitter .setCancellation (() -> observableField .removeOnPropertyChangedCallback (callback ));
32
+
33
+ }, AsyncEmitter .BackpressureMode .LATEST );
44
34
}
45
35
}
0 commit comments