1
- # 实现自己的操作符
2
-
1
+ # 实现自己的操作符
2
+
3
3
你可以实现你自己的Observable操作符,本文展示怎么做。
4
4
5
- 如果你的操作符是被用于* 创造* 一个Observable,而不是变换或者响应一个Observable,使用 [ ` create( ) ` ] ( http://reactivex.io/documentation/operators/create.html ) 方法,不要试图手动实现 ` Observable ` 。另外,你可以按照下面的用法说明创建一个自定义的操作符。
5
+ 如果你的操作符是被用于* 创造* 一个Observable,而不是变换或者响应一个Observable,使用 [ ` create( ) ` ] ( http://reactivex.io/documentation/operators/create.html ) 方法,不要试图手动实现 ` Observable ` 。另外,你可以按照下面的用法说明创建一个自定义的操作符。
6
6
7
7
如果你的操作符是用于Observable发射的单独的数据项,按照下面的说明做:[ _ Sequence Operators_ ] ( Implementing-Your-Own-Operators#序列操作符 ) 。如果你的操作符是用于变换Observable发射的整个数据序列,按照这个说明做:[ _ Transformational Operators_ ] ( Implementing-Your-Own-Operators#变换操作符 ) 。
8
8
9
9
** 提示:** 在一个类似于Groovy的语言Xtend中,你可以以 _ extension methods_ 的方式实现你自己的操作符 ,不使用本文的方法,它们也可以链式调用。详情参见 [ RxJava and Xtend] ( http://mnmlst-dvlpr.blogspot.de/2014/07/rxjava-and-xtend.html )
10
10
11
11
# 序列操作符
12
-
13
- 下面的例子向你展示了怎样使用` lift( ) ` 操作符将你的自定义操作符(在这个例子中是 ` myOperator ` )与标准的RxJava操作符(如` ofType ` 和` map ` )一起使用:
12
+
13
+ 下面的例子向你展示了怎样使用` lift( ) ` 操作符将你的自定义操作符(在这个例子中是 ` myOperator ` )与标准的RxJava操作符(如` ofType ` 和` map ` )一起使用:
14
14
15
15
``` groovy
16
16
fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new MyOperator<T>()).map({"transformed by myOperator: " + it});
17
- ```
17
+ ```
18
18
19
19
下面这部分向你展示了你的操作符的脚手架形式,以便它能正确的与` lift() ` 搭配使用。
20
20
21
21
## 实现你的操作符
22
22
23
- 将你的自定义操作符定义为实现了 [ ` Operator ` ] ( http://reactivex.io/RxJava/javadoc/rx/Observable.Operator.html ) 接口的一个公开类, 就像这样:
23
+ 将你的自定义操作符定义为实现了 [ ` Operator ` ] ( http://reactivex.io/RxJava/javadoc/rx/Observable.Operator.html ) 接口的一个公开类, 就像这样:
24
24
25
25
``` java
26
26
public class MyOperator <T> implements Operator<T > {
@@ -62,13 +62,13 @@ public class MyOperator<T> implements Operator<T> {
62
62
63
63
# 变换操作符
64
64
65
- 下面的例子向你展示了怎样使用 ` compose( ) ` 操作符将你得自定义操作符(在这个例子中,是一个名叫` myTransformer ` 的操作符,它将一个发射整数的Observable转换为发射字符串的)与标准的RxJava操作符(如` ofType ` 和` map ` )一起使用:
65
+ 下面的例子向你展示了怎样使用 ` compose( ) ` 操作符将你得自定义操作符(在这个例子中,是一个名叫` myTransformer ` 的操作符,它将一个发射整数的Observable转换为发射字符串的)与标准的RxJava操作符(如` ofType ` 和` map ` )一起使用:
66
66
67
67
``` groovy
68
68
fooObservable = barObservable.ofType(Integer).map({it*2}).compose(new MyTransformer<Integer,String>()).map({"transformed by myOperator: " + it});
69
- ```
69
+ ```
70
70
71
- 下面这部分向你展示了你的操作符的脚手架形式,以便它能正确的与` compose() ` 搭配使用。
71
+ 下面这部分向你展示了你的操作符的脚手架形式,以便它能正确的与` compose() ` 搭配使用。
72
72
73
73
## 实现你的变换器
74
74
@@ -82,8 +82,8 @@ public class MyTransformer<Integer,String> implements Transformer<Integer,String
82
82
83
83
@Override
84
84
public Observable<String > call (Observable<Integer > source ) {
85
- /*
86
- * 这个简单的例子Transformer应用一个map操作,
85
+ /*
86
+ * 这个简单的例子Transformer应用一个map操作,
87
87
* 这个map操作将发射整数变换为发射整数的字符串表示。
88
88
*/
89
89
return source. map( new Func1<Integer ,String > () {
@@ -101,21 +101,21 @@ public class MyTransformer<Integer,String> implements Transformer<Integer,String
101
101
* [ &ldquo ; Don’ ; t break the chain: use RxJava’ ; s compose() operator&rdquo ; ] ( http://blog.danlew.net/2015/03/02/dont-break-the-chain/ ) by Dan Lew
102
102
103
103
# 其它需要考虑的
104
-
105
- * 在发射任何数据(或者通知)给订阅者之前,你的序列操作符可能需要检查它的 [ ` Subscriber.isUnsubscribed( ) ` ] ( Observable#unsubscribing ) 状态,如果没有订阅者了,没必要浪费时间生成数据项。
106
- * 请注意:你的序列操作符必须复合Observable协议的核心原则:
107
- * 它可能调用订阅者的 [ ` onNext( ) ` ] ( Observable#onnext-oncompleted-and-onerror ) 方法任意次,但是这些调用必须是不重叠的。
108
- * 它只能调用订阅者的 [ ` onCompleted( ) ` ] ( Observable#onnext-oncompleted-and-onerror ) 或 [ ` onError( ) ` ] ( Observable#onnext-oncompleted-and-onerror ) 正好一次,但不能都调用,而且不能在这之后调用订阅者的 [ ` onNext( ) ` ] ( Observable#onnext-oncompleted-and-onerror ) 方法。
104
+
105
+ * 在发射任何数据(或者通知)给订阅者之前,你的序列操作符可能需要检查它的 [ ` Subscriber.isUnsubscribed( ) ` ] ( Observable#unsubscribing ) 状态,如果没有订阅者了,没必要浪费时间生成数据项。
106
+ * 请注意:你的序列操作符必须复合Observable协议的核心原则:
107
+ * 它可能调用订阅者的 [ ` onNext( ) ` ] ( Observable#onnext-oncompleted-and-onerror ) 方法任意次,但是这些调用必须是不重叠的。
108
+ * 它只能调用订阅者的 [ ` onCompleted( ) ` ] ( Observable#onnext-oncompleted-and-onerror ) 或 [ ` onError( ) ` ] ( Observable#onnext-oncompleted-and-onerror ) 正好一次,但不能都调用,而且不能在这之后调用订阅者的 [ ` onNext( ) ` ] ( Observable#onnext-oncompleted-and-onerror ) 方法。
109
109
* 如果你不能保证你得操作符遵从这两个原则,你可以给它添加 [ ` serialize( ) ` ] ( Observable-Utility-Operators#serialize ) 操作符,它会强制保持正确的行为。
110
110
* 请关注这里 [ Issue #1962 ] ( https://github.com/ReactiveX/RxJava/issues/1962 ) &mdash;需要有一个计划创建一个测试脚手架,你可以用它来写测试验证你的新操作符遵从了Observable协议。
111
111
* 不要让你的操作符阻塞别的操作。
112
- * When possible, you should compose new operators by combining existing operators, rather than implementing them with new code. RxJava itself does this with some of its standard operators, for example:
112
+ * When possible, you should compose new operators by combining existing operators, rather than implementing them with new code. RxJava itself does this with some of its standard operators, for example:
113
113
* 如果可能,你应该组合现有的操作符创建你的新操作符,而不是从零开始实现它。RxJava自身的标准操作符也是这样做的,例如:
114
114
* [ ` first( ) ` ] ( http://reactivex.io/documentation/operators/first.html ) 被定义为 <tt >[ take(1)] ( http://reactivex.io/documentation/operators/take.html ) .[ single( )] ( http://reactivex.io/documentation/operators/first.html ) </tt >
115
115
* [ ` ignoreElements( ) ` ] ( http://reactivex.io/documentation/operators/ignoreelements.html ) 被定义为 <tt >[ filter(alwaysFalse( ))] ( http://reactivex.io/documentation/operators/filter.html ) </tt >
116
- * [ ` reduce(a) ` ] ( http://reactivex.io/documentation/operators/reduce.html ) 被定义为 <tt >[ scan(a)] ( http://reactivex.io/documentation/operators/scan.html ) .[ last( )] ( http://reactivex.io/documentation/operators/last.html ) </tt >
117
- * 如果你的操作符使用了函数或者lambda表达式作为参数,请注意它们可能是异常的来源,而且要准备好捕获这些异常,并且使用 ` onError() ` 通知订阅者。
118
- * 某些异常被认为是致命的,对它们来说,调用 ` onError() ` 毫无意义,那样或者是无用的,或者只是对问题的妥协。你可以使用 ` Exceptions.throwIfFatal(throwable) ` 方法过滤掉这些知名的异常 ,并重新抛出它们,而不是试图发射关于它们的通知。
119
- * 一般说来,一旦发生错误应立即通知订阅者,而不是首先尝试发射更多的数据。
120
- * 请注意 ` null ` 可能是Observable发射的一个合法数据。频繁发生错误的一个来源是:测试一些变量并且将持有一个非 ` null ` 值作为是否发射了数据的替代。一个值为 ` null ` 的数据仍然是一个发射数据项,它与没有发射任何东西是不能等同的。
121
- * 想让你的操作符在反压(* backpressure* )场景中变得得好可能会非常棘手。可以参考Dávid Karnok的博客 [ Advanced RxJava] ( http://akarnokd.blogspot.hu/ ) ,这里有一个涉及到的各种因素和怎样处理它们的很值得看的讨论。
116
+ * [ ` reduce(a) ` ] ( http://reactivex.io/documentation/operators/reduce.html ) 被定义为 <tt >[ scan(a)] ( http://reactivex.io/documentation/operators/scan.html ) .[ last( )] ( http://reactivex.io/documentation/operators/last.html ) </tt >
117
+ * 如果你的操作符使用了函数或者lambda表达式作为参数,请注意它们可能是异常的来源,而且要准备好捕获这些异常,并且使用 ` onError() ` 通知订阅者。
118
+ * 某些异常被认为是致命的,对它们来说,调用 ` onError() ` 毫无意义,那样或者是无用的,或者只是对问题的妥协。你可以使用 ` Exceptions.throwIfFatal(throwable) ` 方法过滤掉这些致命的异常 ,并重新抛出它们,而不是试图发射关于它们的通知。
119
+ * 一般说来,一旦发生错误应立即通知订阅者,而不是首先尝试发射更多的数据。
120
+ * 请注意 ` null ` 可能是Observable发射的一个合法数据。频繁发生错误的一个来源是:测试一些变量并且将持有一个非 ` null ` 值作为是否发射了数据的替代。一个值为 ` null ` 的数据仍然是一个发射数据项,它与没有发射任何东西是不能等同的。
121
+ * 想让你的操作符在反压(* backpressure* )场景中变得得好可能会非常棘手。可以参考Dávid Karnok的博客 [ Advanced RxJava] ( http://akarnokd.blogspot.hu/ ) ,这里有一个涉及到的各种因素和怎样处理它们的很值得看的讨论。
0 commit comments