Skip to content

Commit 4d0fea2

Browse files
committed
add push demo
1 parent e7a2e43 commit 4d0fea2

File tree

4 files changed

+178
-37
lines changed

4 files changed

+178
-37
lines changed

app/src/main/AndroidManifest.xml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@
1111
android:roundIcon="@mipmap/ic_launcher_round"
1212
android:supportsRtl="true"
1313
android:theme="@style/AppTheme">
14-
<activity android:name=".establishUsage_Demo.establishUsage">
14+
<activity android:name=".functionUsage_Demo.RxJavafixRetrofit">
1515
<intent-filter>
1616
<action android:name="android.intent.action.MAIN" />
1717

1818
<category android:name="android.intent.category.LAUNCHER" />
1919
</intent-filter>
2020
</activity>
2121

22-
<activity android:name=".MainActivity">
22+
<activity android:name=".establishUsage_Demo.establishUsage">
2323
</activity>
2424
<activity android:name=".establishUsage_Demo.RxJavafixRxjava">
2525
</activity>
@@ -28,6 +28,8 @@
2828

2929
<activity android:name=".FilterUsage">
3030
</activity>
31+
<activity android:name=".MainActivity">
32+
</activity>
3133
</application>
3234

3335
</manifest>

app/src/main/java/scut/carson_ho/rxjava_operators/MainActivity.java

Lines changed: 54 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,12 @@
44
import android.support.v7.app.AppCompatActivity;
55
import android.util.Log;
66

7-
import java.util.ArrayList;
8-
import java.util.List;
97
import java.util.concurrent.TimeUnit;
108

119
import io.reactivex.Observable;
12-
import io.reactivex.ObservableEmitter;
13-
import io.reactivex.ObservableOnSubscribe;
14-
import io.reactivex.ObservableSource;
15-
import io.reactivex.functions.Consumer;
16-
import io.reactivex.functions.Function;
10+
import io.reactivex.Observer;
11+
import io.reactivex.disposables.Disposable;
12+
import io.reactivex.functions.Predicate;
1713

1814

1915
public class MainActivity extends AppCompatActivity {
@@ -25,36 +21,68 @@ protected void onCreate(Bundle savedInstanceState) {
2521
super.onCreate(savedInstanceState);
2622
setContentView(R.layout.activity_main);
2723

28-
// 采用RxJava基于事件流的链式操作
29-
Observable.create(new ObservableOnSubscribe<Integer>() {
24+
25+
// 1. 每1s发送1个数据 = 从0开始,递增1,即0、1、2、3
26+
Observable.interval(1, TimeUnit.SECONDS)
27+
// 2. 通过takeUntil的Predicate传入判断条件
28+
.takeUntil(new Predicate<Long>(){
29+
@Override
30+
public boolean test( Long integer) throws Exception {
31+
return (integer>3);
32+
// 返回true时,就停止发送事件
33+
// 当发送的数据满足>3时,就停止发送Observable的数据
34+
}
35+
}).subscribe(new Observer<Long>() {
36+
@Override
37+
public void onSubscribe(Disposable d) {
38+
}
39+
3040
@Override
31-
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
32-
emitter.onNext(1);
33-
emitter.onNext(2);
34-
emitter.onNext(3);
41+
public void onNext(Long value) {
42+
Log.d(TAG,"发送了事件 "+ value);
3543
}
3644

37-
// 采用flatMap()变换操作符
38-
}).flatMap(new Function<Integer, ObservableSource<String>>() {
3945
@Override
40-
public ObservableSource<String> apply(Integer integer) throws Exception {
41-
final List<String> list = new ArrayList<>();
42-
for (int i = 0; i < 3; i++) {
43-
list.add("我是事件 " + integer + "拆分后的子事件" + i);
44-
// 通过flatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件
45-
// 最终合并,再发送给被观察者
46-
}
47-
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
46+
public void onError(Throwable e) {
4847
}
49-
}).subscribe(new Consumer<String>() {
48+
5049
@Override
51-
public void accept(String s) throws Exception {
52-
Log.d(TAG, s);
50+
public void onComplete() {
5351
}
5452
});
5553

5654

5755

56+
// // 采用RxJava基于事件流的链式操作
57+
// Observable.create(new ObservableOnSubscribe<Integer>() {
58+
// @Override
59+
// public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
60+
// emitter.onNext(1);
61+
// emitter.onNext(2);
62+
// emitter.onNext(3);
63+
// }
64+
//
65+
// // 采用flatMap()变换操作符
66+
// }).flatMap(new Function<Integer, ObservableSource<String>>() {
67+
// @Override
68+
// public ObservableSource<String> apply(Integer integer) throws Exception {
69+
// final List<String> list = new ArrayList<>();
70+
// for (int i = 0; i < 3; i++) {
71+
// list.add("我是事件 " + integer + "拆分后的子事件" + i);
72+
// // 通过flatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件
73+
// // 最终合并,再发送给被观察者
74+
// }
75+
// return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
76+
// }
77+
// }).subscribe(new Consumer<String>() {
78+
// @Override
79+
// public void accept(String s) throws Exception {
80+
// Log.d(TAG, s);
81+
// }
82+
// });
83+
84+
85+
5886
// Observable.create(new ObservableOnSubscribe<Integer>() {
5987
// @Override
6088
// public void subscribe(ObservableEmitter<Integer> e) throws Exception {

app/src/main/java/scut/carson_ho/rxjava_operators/establishUsage_Demo/establishUsage.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,22 @@ protected void onCreate(Bundle savedInstanceState) {
3232
Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
3333
@Override
3434
public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
35-
// 返回新的被观察者 Observable
36-
// 此处有两种情况:
37-
// 1. 原始的Observable不重新发送事件:新的被观察者 Observable发送的事件 = Error事件
38-
// 2. 原始的Observable重新发送事件:新的被观察者 Observable发送的事件 = 数据事件
35+
// 将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
36+
// 以此决定是否重新订阅 & 发送原来的 Observable
37+
// 此处有2种情况:
38+
// 1. 若新被观察者(Observable)返回1个Complete事件,则不重新订阅 & 发送原来的 Observable
39+
// 2. 若新被观察者(Observable)返回1个Next事件,则重新订阅 & 发送原来的 Observable
3940
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
4041
@Override
4142
public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {
4243

43-
// 1. 若返回的Observable发送的事件 = Error事件,则原始的Observable不重新发送事件
44-
// 该异常错误信息可在观察者中的onError()中获得
44+
// 情况1:若新被观察者(Observable)返回1个Complete事件,则不重新订阅 & 发送原来的 Observable
4545
return Observable.empty();
46+
// 返回Error
47+
// return Observable.error(new Throwable("不再重新订阅事件"));
4648

47-
// 2. 若返回的Observable发送的事件 = 数据事件,则原始的Observable重新发送事件(若持续遇到错误,则持续重试)
48-
// return Observable.just(1);
49+
// 情况2:若新被观察者(Observable)返回1个Next事件,则重新订阅 & 发送原来的 Observable
50+
// return Observable.just(1);
4951
}
5052
});
5153

@@ -64,7 +66,8 @@ public void onNext(Integer value) {
6466

6567
@Override
6668
public void onError(Throwable e) {
67-
Log.d(TAG, "对Error事件作出响应");
69+
70+
Log.d(TAG, "对Error事件作出响应:" + e.toString());
6871
}
6972

7073
@Override
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package scut.carson_ho.rxjava_operators.functionUsage_Demo;
2+
3+
import android.os.Bundle;
4+
import android.support.v7.app.AppCompatActivity;
5+
import android.util.Log;
6+
7+
import com.jakewharton.retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
8+
9+
import java.util.concurrent.TimeUnit;
10+
11+
import io.reactivex.Observable;
12+
import io.reactivex.ObservableSource;
13+
import io.reactivex.Observer;
14+
import io.reactivex.android.schedulers.AndroidSchedulers;
15+
import io.reactivex.annotations.NonNull;
16+
import io.reactivex.disposables.Disposable;
17+
import io.reactivex.functions.Function;
18+
import io.reactivex.schedulers.Schedulers;
19+
import retrofit2.Retrofit;
20+
import retrofit2.converter.gson.GsonConverterFactory;
21+
import scut.carson_ho.rxjava_operators.R;
22+
import scut.carson_ho.rxjava_operators.establishUsage_Demo.GetRequest_Interface;
23+
import scut.carson_ho.rxjava_operators.establishUsage_Demo.Translation;
24+
25+
/**
26+
* Created by Carson_Ho on 17/9/9.
27+
*/
28+
29+
public class RxJavafixRetrofit extends AppCompatActivity {
30+
31+
private static final String TAG = "Rxjava";
32+
33+
// 设置变量 = 模拟轮询服务器次数
34+
private int i = 0 ;
35+
36+
@Override
37+
protected void onCreate(Bundle savedInstanceState) {
38+
super.onCreate(savedInstanceState);
39+
setContentView(R.layout.activity_main);
40+
41+
// 步骤1:创建Retrofit对象
42+
Retrofit retrofit = new Retrofit.Builder()
43+
.baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
44+
.addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
45+
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
46+
.build();
47+
48+
// 步骤2:创建 网络请求接口 的实例
49+
GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);
50+
51+
// 步骤3:采用Observable<...>形式 对 网络请求 进行封装
52+
Observable<Translation> observable = request.getCall();
53+
54+
// 步骤4:发送网络请求 & 通过repeatWhen()进行轮询
55+
observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
56+
@Override
57+
// 在Function函数中,必须对输入的 Observable<Object>进行处理,此处使用flatMap操作符接收上游的数据
58+
public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
59+
// 将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
60+
// 以此决定是否重新订阅 & 发送原来的 Observable,即轮询
61+
// 此处有2种情况:
62+
// 1. 若返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable,即轮询结束
63+
// 2. 若返回其余事件,则重新订阅 & 发送原来的 Observable,即继续轮询
64+
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
65+
@Override
66+
public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {
67+
68+
// 加入判断条件:当轮询次数 = 5次后,就停止轮询
69+
if (i > 3) {
70+
// 此处选择发送onError事件以结束轮询,因为可触发下游观察者的onError()方法回调
71+
return Observable.error(new Throwable("轮询结束"));
72+
}
73+
// 若轮询次数<4次,则发送1Next事件以继续轮询
74+
// 注:此处加入了delay操作符,作用 = 延迟一段时间发送(此处设置 = 2s),以实现轮询间间隔设置
75+
return Observable.just(1).delay(2000, TimeUnit.MILLISECONDS);
76+
}
77+
});
78+
79+
}
80+
}).subscribeOn(Schedulers.io()) // 切换到IO线程进行网络请求
81+
.observeOn(AndroidSchedulers.mainThread()) // 切换回到主线程 处理请求结果
82+
.subscribe(new Observer<Translation>() {
83+
@Override
84+
public void onSubscribe(Disposable d) {
85+
}
86+
87+
@Override
88+
public void onNext(Translation result) {
89+
// e.接收服务器返回的数据
90+
result.show() ;
91+
i++;
92+
}
93+
94+
@Override
95+
public void onError(Throwable e) {
96+
// 获取轮询结束信息
97+
Log.d(TAG, e.toString());
98+
}
99+
100+
@Override
101+
public void onComplete() {
102+
103+
}
104+
});
105+
106+
}
107+
}
108+

0 commit comments

Comments
 (0)