Skip to content

Commit 4d47736

Browse files
committed
add combinlateset
1 parent 5eda2a6 commit 4d47736

File tree

1 file changed

+58
-18
lines changed

1 file changed

+58
-18
lines changed

app/src/main/java/scut/carson_ho/rxjava_operators/MainActivity.java

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
import android.support.v7.app.AppCompatActivity;
55
import android.util.Log;
66

7+
import java.util.concurrent.TimeUnit;
8+
79
import io.reactivex.Observable;
8-
import io.reactivex.annotations.NonNull;
910
import io.reactivex.functions.BiFunction;
1011
import io.reactivex.functions.Consumer;
1112

@@ -19,23 +20,62 @@ protected void onCreate(Bundle savedInstanceState) {
1920
super.onCreate(savedInstanceState);
2021
setContentView(R.layout.activity_main);
2122

22-
Observable.just(1,2,3,4)
23-
.reduce(new BiFunction<Integer, Integer, Integer>() {
24-
// 在该复写方法中复写聚合的逻辑
25-
@Override
26-
public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
27-
Log.e(TAG, "本次计算的数据是: "+s1 +" 乘 "+ s2);
28-
return s1 * s2;
29-
// 本次聚合的逻辑是:全部数据相乘起来
30-
// 原理:第1次取前2个数据相乘,之后每次获取到的数据 = 返回的数据x原始下1个数据每
31-
}
32-
}).subscribe(new Consumer<Integer>() {
33-
@Override
34-
public void accept(@NonNull Integer s) throws Exception {
35-
Log.e(TAG, "最终计算的结果是: "+s);
36-
37-
}
38-
});
23+
24+
25+
Observable.combineLatest(
26+
Observable.intervalRange(0, 3, 0, 2, TimeUnit.SECONDS), // 第1个发送数据事件的Observable
27+
Observable.intervalRange(0, 3, 0, 1, TimeUnit.SECONDS), // 第2个发送数据事件的Observable:从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
28+
new BiFunction<Long, Long, Long>() {
29+
@Override
30+
public Long apply(Long o1, Long o2) throws Exception {
31+
// o1 = 第1个Observable发送的最新(最后)1个数据
32+
// o2 = 第2个Observable发送的每1个数据
33+
Log.e(TAG, "合并的数据是: "+ o1 + " "+ o2);
34+
return o1 + o2;
35+
// 合并的逻辑 = 相加
36+
// 即第1个Observable发送的最后1个数据 与 第2个Observable发送的每1个数据进行相加
37+
}
38+
}).subscribe(new Consumer<Long>() {
39+
@Override
40+
public void accept(Long s) throws Exception {
41+
Log.e(TAG, "合并的结果是: "+s);
42+
}
43+
});
44+
45+
//
46+
// Observable.concatArrayDelayError(
47+
// Observable.create(new ObservableOnSubscribe<Integer>() {
48+
// @Override
49+
// public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
50+
//
51+
// emitter.onNext(1);
52+
// emitter.onNext(2);
53+
// emitter.onNext(3);
54+
// emitter.onError(new NullPointerException()); // 发送Error事件,因为使用了concatDelayError,所以第2个Observable将会发送事件,等发送完毕后,再发送错误事件
55+
// emitter.onComplete();
56+
// }
57+
// }),
58+
// Observable.just(4, 5, 6))
59+
// .subscribe(new Observer<Integer>() {
60+
// @Override
61+
// public void onSubscribe(Disposable d) {
62+
//
63+
// }
64+
// @Override
65+
// public void onNext(Integer value) {
66+
// Log.d(TAG, "接收到了事件"+ value );
67+
// }
68+
//
69+
// @Override
70+
// public void onError(Throwable e) {
71+
// Log.d(TAG, "对Error事件作出响应");
72+
// }
73+
//
74+
// @Override
75+
// public void onComplete() {
76+
// Log.d(TAG, "对Complete事件作出响应");
77+
// }
78+
// });
3979

4080
// 在一个被观察者发送事件前,追加发送被观察者 & 发送数据
4181
// 注:追加数据顺序 = 后调用先追加

0 commit comments

Comments
 (0)