Skip to content

Commit deda0ad

Browse files
committed
add establishUsage
1 parent 5fc0305 commit deda0ad

File tree

3 files changed

+246
-32
lines changed

3 files changed

+246
-32
lines changed

app/src/main/AndroidManifest.xml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@
99
android:roundIcon="@mipmap/ic_launcher_round"
1010
android:supportsRtl="true"
1111
android:theme="@style/AppTheme">
12-
<activity android:name=".MainActivity">
12+
<activity android:name=".establishUsage">
1313
<intent-filter>
1414
<action android:name="android.intent.action.MAIN" />
1515

1616
<category android:name="android.intent.category.LAUNCHER" />
1717
</intent-filter>
1818
</activity>
19+
20+
<activity android:name=".MainActivity">
21+
</activity>
1922
</application>
2023

2124
</manifest>

app/src/main/java/scut/carson_ho/rxjava_operators/MainActivity.java

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@
44
import android.support.v7.app.AppCompatActivity;
55
import android.util.Log;
66

7+
import java.util.ArrayList;
78
import java.util.List;
9+
import java.util.concurrent.TimeUnit;
810

911
import io.reactivex.Observable;
10-
import io.reactivex.Observer;
11-
import io.reactivex.disposables.Disposable;
12+
import io.reactivex.ObservableEmitter;
13+
import io.reactivex.ObservableOnSubscribe;
14+
import io.reactivex.ObservableSource;
15+
import io.reactivex.functions.Consumer;
16+
import io.reactivex.functions.Function;
1217

1318

1419
public class MainActivity extends AppCompatActivity {
@@ -20,35 +25,33 @@ protected void onCreate(Bundle savedInstanceState) {
2025
super.onCreate(savedInstanceState);
2126
setContentView(R.layout.activity_main);
2227

23-
// 被观察者 需要发送5个数字
24-
Observable.just(1, 2, 3, 4, 5)
25-
.buffer(3, 1) // 设置缓存区大小 & 步长
26-
// 缓存区大小 = 每次从被观察者中获取的事件数量
27-
// 步长 = 每次获取新事件的数量
28-
.subscribe(new Observer<List<Integer>>() {
29-
@Override
30-
public void onSubscribe(Disposable d) {
31-
32-
}
33-
@Override
34-
public void onNext(List<Integer> stringList) {
35-
//
36-
Log.d(TAG, " 缓存区里的事件数量 = " + stringList.size());
37-
for (Integer value : stringList) {
38-
Log.d(TAG, " 事件 = " + value);
39-
}
40-
}
41-
42-
@Override
43-
public void onError(Throwable e) {
44-
Log.d(TAG, "对Error事件作出响应" );
45-
}
46-
47-
@Override
48-
public void onComplete() {
49-
Log.d(TAG, "对Complete事件作出响应");
50-
}
51-
});
28+
// 采用RxJava基于事件流的链式操作
29+
Observable.create(new ObservableOnSubscribe<Integer>() {
30+
@Override
31+
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
32+
emitter.onNext(1);
33+
emitter.onNext(2);
34+
emitter.onNext(3);
35+
}
36+
37+
// 采用flatMap()变换操作符
38+
}).flatMap(new Function<Integer, ObservableSource<String>>() {
39+
@Override
40+
public ObservableSource<String> apply(Integer integer) throws Exception {
41+
final List<String> list = new ArrayList<>();
42+
for (int i = 0; i < 3; i++) {
43+
list.add("我是事件 " + integer + "拆分后的子事件" + i);
44+
// 通过flatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件
45+
// 最终合并,再发送给被观察者
46+
}
47+
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
48+
}
49+
}).subscribe(new Consumer<String>() {
50+
@Override
51+
public void accept(String s) throws Exception {
52+
Log.d(TAG, s);
53+
}
54+
});
5255

5356

5457

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
package scut.carson_ho.rxjava_operators;
2+
3+
import android.os.Bundle;
4+
import android.support.v7.app.AppCompatActivity;
5+
import android.util.Log;
6+
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
10+
import io.reactivex.Observable;
11+
import io.reactivex.Observer;
12+
import io.reactivex.disposables.Disposable;
13+
14+
/**
15+
* Created by Carson_Ho on 17/9/6.
16+
*/
17+
18+
public class establishUsage extends AppCompatActivity {
19+
20+
private String TAG = "RxJava";
21+
Integer i = 10;
22+
23+
@Override
24+
protected void onCreate(Bundle savedInstanceState) {
25+
super.onCreate(savedInstanceState);
26+
setContentView(R.layout.activity_main);
27+
28+
29+
/*
30+
* 数组遍历
31+
**/
32+
33+
// 1. 设置需要传入的数组
34+
Integer[] items = { 0, 1, 2, 3, 4 };
35+
36+
// 2. 创建被观察者对象(Observable)时传入数组
37+
// 在创建后就会将该数组转换成Observable & 发送该对象中的所有数据
38+
Observable.fromArray(items)
39+
.subscribe(new Observer<Integer>() {
40+
@Override
41+
public void onSubscribe(Disposable d) {
42+
Log.d(TAG, "数组遍历");
43+
}
44+
45+
@Override
46+
public void onNext(Integer value) {
47+
Log.d(TAG, "数组中的元素 = "+ value );
48+
}
49+
50+
@Override
51+
public void onError(Throwable e) {
52+
Log.d(TAG, "对Error事件作出响应");
53+
}
54+
55+
@Override
56+
public void onComplete() {
57+
Log.d(TAG, "遍历结束");
58+
}
59+
60+
});
61+
62+
63+
/*
64+
* 集合遍历
65+
**/
66+
// 1. 设置一个集合
67+
List<Integer> list = new ArrayList<>();
68+
list.add(1);
69+
list.add(2);
70+
list.add(3);
71+
72+
// 2. 通过fromIterable()将集合中的对象 / 数据发送出去
73+
Observable.fromIterable(list)
74+
.subscribe(new Observer<Integer>() {
75+
@Override
76+
public void onSubscribe(Disposable d) {
77+
Log.d(TAG, "集合遍历");
78+
}
79+
80+
@Override
81+
public void onNext(Integer value) {
82+
Log.d(TAG, "集合中的数据元素 = "+ value );
83+
}
84+
85+
@Override
86+
public void onError(Throwable e) {
87+
Log.d(TAG, "对Error事件作出响应");
88+
}
89+
90+
@Override
91+
public void onComplete() {
92+
Log.d(TAG, "遍历结束");
93+
}
94+
});
95+
96+
97+
98+
99+
/*
100+
* 周期性操作
101+
**/
102+
// 该例子发送的事件序列特点:延迟2s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个) = 每隔1s进行1次操作
103+
// Observable.interval(2,1,TimeUnit.SECONDS)
104+
// .subscribe(new Observer<Long>() {
105+
// @Override
106+
// public void onSubscribe(Disposable d) {
107+
// Log.d(TAG, "开始采用subscribe连接");
108+
// }
109+
//
110+
// @Override
111+
// public void onNext(Long value) {
112+
// Log.d(TAG, "每隔1s进行1次操作" );
113+
// }
114+
//
115+
// @Override
116+
// public void onError(Throwable e) {
117+
// Log.d(TAG, "对Error事件作出响应");
118+
// }
119+
//
120+
// @Override
121+
// public void onComplete() {
122+
// Log.d(TAG, "对Complete事件作出响应");
123+
// }
124+
//
125+
// });
126+
127+
// 注:interval默认在computation调度器上执行
128+
// 也可自定义指定线程调度器(第3个参数):interval(long,TimeUnit,Scheduler)
129+
130+
131+
132+
/*
133+
* 定时操作
134+
**/
135+
136+
// // 该例子 = 延迟2s后,进行日志输出操作
137+
// Observable.timer(2, TimeUnit.SECONDS)
138+
// .subscribe(new Observer<Long>() {
139+
// @Override
140+
// public void onSubscribe(Disposable d) {
141+
// Log.d(TAG, "开始采用subscribe连接");
142+
// }
143+
//
144+
// @Override
145+
// public void onNext(Long value) {
146+
// }
147+
//
148+
// @Override
149+
// public void onError(Throwable e) {
150+
// Log.d(TAG, "对Error事件作出响应");
151+
// }
152+
//
153+
// @Override
154+
// public void onComplete() {
155+
// Log.d(TAG, "在2s后进行了该操作");
156+
// }
157+
//
158+
// });
159+
160+
// 注:timer操作符默认运行在一个新线程上
161+
// 也可自定义线程调度器(第3个参数):timer(long,TimeUnit,Scheduler)
162+
163+
164+
/*
165+
* 完整创建被观察者
166+
**/
167+
168+
// 步骤1:通过create()创建完整的被观察者对象
169+
// Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
170+
// // 2. 在复写的subscribe()里定义需要发送的事件
171+
// @Override
172+
// public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
173+
// emitter.onNext(1);
174+
// emitter.onNext(2);
175+
// emitter.onNext(3);
176+
// emitter.onComplete();
177+
// }
178+
// }); // 至此,一个完整的被观察者对象创建完毕。
179+
//
180+
// // 步骤2:创建观察者 Observer 并 定义响应事件行为
181+
// Observer<Integer> observer = new Observer<Integer>() {
182+
// @Override
183+
// public void onSubscribe(Disposable d) {
184+
// Log.d(TAG, "开始采用subscribe连接");
185+
// }
186+
//
187+
// @Override
188+
// public void onNext(Integer value) {
189+
// Log.d(TAG, "接收到了事件 = "+ value );
190+
// }
191+
//
192+
// @Override
193+
// public void onError(Throwable e) {
194+
// Log.d(TAG, "对Error事件作出响应");
195+
// }
196+
//
197+
// @Override
198+
// public void onComplete() {
199+
// Log.d(TAG, "对Complete事件作出响应");
200+
// }
201+
// };
202+
//
203+
//
204+
// // 步骤3:通过订阅(subscribe)连接观察者和被观察者
205+
// observable.subscribe(observer);
206+
207+
}
208+
}

0 commit comments

Comments
 (0)