44import android .support .v7 .app .AppCompatActivity ;
55import android .util .Log ;
66
7+ import java .util .concurrent .TimeUnit ;
8+
79import io .reactivex .Observable ;
8- import io .reactivex .functions .Consumer ;
9- import io .reactivex .functions .Predicate ;
10+ import io .reactivex .ObservableEmitter ;
11+ import io .reactivex .ObservableOnSubscribe ;
12+ import io .reactivex .Observer ;
13+ import io .reactivex .disposables .Disposable ;
1014
1115
1216public class MainActivity extends AppCompatActivity {
@@ -18,6 +22,144 @@ protected void onCreate(Bundle savedInstanceState) {
1822 setContentView (R .layout .activity_main );
1923
2024
25+
26+ Observable .create (new ObservableOnSubscribe <Integer >() {
27+ @ Override
28+ public void subscribe (ObservableEmitter <Integer > e ) throws Exception {
29+ // 每隔0.3s发送一个事件
30+ Thread .sleep (300 );
31+ e .onNext (1 );
32+
33+ Thread .sleep (300 );
34+ e .onNext (2 );
35+
36+ Thread .sleep (300 );
37+ e .onNext (3 );
38+
39+ Thread .sleep (300 );
40+ e .onNext (4 );
41+
42+ Thread .sleep (300 );
43+ e .onNext (5 );
44+
45+ Thread .sleep (300 );
46+ e .onNext (6 );
47+
48+ Thread .sleep (300 );
49+ e .onNext (7 );
50+
51+ Thread .sleep (300 );
52+ e .onNext (8 );
53+
54+ Thread .sleep (300 );
55+ e .onNext (9 );
56+
57+ Thread .sleep (300 );
58+ e .onComplete ();
59+ }
60+ }).sample (1 , TimeUnit .SECONDS )// 每隔1s获取Observable最近发送的事件
61+ .subscribe (new Observer <Integer >() {
62+ @ Override
63+ public void onSubscribe (Disposable d ) {
64+ Log .d (TAG , "开始采用subscribe连接" );
65+ }
66+
67+ @ Override
68+ public void onNext (Integer value ) {
69+ Log .d (TAG , "接收到了事件" + value );
70+ }
71+
72+ @ Override
73+ public void onError (Throwable e ) {
74+ Log .d (TAG , "对Error事件作出响应" );
75+ }
76+
77+ @ Override
78+ public void onComplete () {
79+ Log .d (TAG , "对Complete事件作出响应" );
80+ }
81+ });
82+
83+ //
84+ // Observable.sequenceEqual(
85+ // Observable.just(4,5,6),
86+ // Observable.just(4,5,6)
87+ // )
88+ // .subscribe(new Consumer<Boolean>() {
89+ // @Override
90+ // public void accept( Boolean aBoolean) throws Exception {
91+ // Log.d(TAG,"2个Observable是否相同:"+ aBoolean);
92+ // // 输出返回结果
93+ // }
94+ // });
95+
96+
97+ // // (原始)第1个Observable:每隔1s发送1个数据 = 从0开始,每次递增1
98+ // Observable.interval(1, TimeUnit.SECONDS)
99+ // // 第2个Observable:延迟5s后开始发送1个Long型数据
100+ // .skipUntil(Observable.timer(5, TimeUnit.SECONDS))
101+ // .subscribe(new Observer<Long>() {
102+ // @Override
103+ // public void onSubscribe(Disposable d) {
104+ // Log.d(TAG, "开始采用subscribe连接");
105+ // }
106+ //
107+ // @Override
108+ // public void onNext(Long value) {
109+ // Log.d(TAG, "接收到了事件"+ value );
110+ // }
111+ //
112+ // @Override
113+ // public void onError(Throwable e) {
114+ // Log.d(TAG, "对Error事件作出响应");
115+ // }
116+ //
117+ // @Override
118+ // public void onComplete() {
119+ // Log.d(TAG, "对Complete事件作出响应");
120+ // }
121+ //
122+ // });
123+
124+
125+
126+
127+
128+
129+ //
130+ // // 1. 每1s发送1个数据 = 从0开始,递增1,即0、1、2、3
131+ // Observable.interval(1, TimeUnit.SECONDS)
132+ // // 2. 通过takeWhile传入一个判断条件
133+ // .takeWhile(new Predicate<Long>(){
134+ // @Override
135+ // public boolean test( Long integer) throws Exception {
136+ // return (integer<3);
137+ // // 当发送的数据满足<3时,才发送Observable的数据
138+ // }
139+ // }).subscribe(new Observer<Long>() {
140+ // @Override
141+ // public void onSubscribe(Disposable d) {
142+ // }
143+ //
144+ // @Override
145+ // public void onNext(Long value) {
146+ // Log.d(TAG,"发送了事件 "+ value);
147+ // }
148+ //
149+ // @Override
150+ // public void onError(Throwable e) {
151+ // }
152+ //
153+ // @Override
154+ // public void onComplete() {
155+ // }
156+ // });
157+
158+
159+
160+
161+
162+
21163// Observable.just(1,2,3,4,5,6)
22164// .map(new Function<Integer,String>() {
23165// @Override
@@ -30,23 +172,10 @@ protected void onCreate(Bundle savedInstanceState) {
30172
31173
32174
175+ }
33176
34177
35- Observable .just (1 ,2 ,3 ,4 ,5 ,6 )
36- .all (new Predicate <Integer >(){
37- @ Override
38- public boolean test ( Integer integer ) throws Exception {
39- return (integer <=10 );
40- // 该函数用于判断Observable发送的10个数据是否都满足integer<=10
41- }
42- }).subscribe (new Consumer <Boolean >() {
43- @ Override
44- public void accept (Boolean aBoolean ) throws Exception {
45- Log .d (TAG ,"result is " + aBoolean );
46- // 输出返回结果
47- }
48178
49- });
50179
51180// Observable.just(1,2,3,4,5,6)
52181// .all(new Func2<Integer, Boolean>() {
@@ -138,6 +267,6 @@ public void accept(Boolean aBoolean) throws Exception {
138267// });
139268 }
140269
141- }
270+
142271
143272
0 commit comments