Description
1
在RxJS中,有一系列用产生Observable的函数,这些函数有的凭空创造Observable对象,有的根据外部数据源产生Observable对象,更多的是根据其他的Observable中的数据来产生新的Observable对象,也就是把上游数据转化为下游数据,所有这些函数统称为操作符。
操作符其实就是解决某个具体应用问题的模式。创建Observable对象需要创建类操作符,当需要将多个数据流中的数据回合到一起处理时,需要合并类操作符,当需要筛选去除一些数据时,需要过滤类操作符,当希望把数据流中的数据变化为其他数据时,需要转化类操作符,而对数据流的处理可能引起异常,所以需要异常处理类操作符,最后要让一个数据流的数据可以提供给多个观察者,我们需要多播类操作符
操作符也分为静态操作符和实例操作符,静态操作符只能出现åi收尾,实力操作符则可以出现在任何位置。
在实际代码中,如果不想把RxJS全部导入,而是想单独导入每个操作符的相关功能,那么导入代码也会有所区别。
// 导入静态操作符,比如名为of的操作符:
import 'rxjs/add/observable/of';
// 导入实例操作符,比如map
import 'rxjs/add/operator/map';
2
每个操作符都是一个函数,不管实现什么功能,都必须考虑下面这些功能要点:
- 返回一个全新的Observable对象
- 对上游和下游的订阅和退订处理
- 处理异常情况
- 及时释放资源
map操作符的实现
function map(handler){
// 返回一个全新的Observable对象
return new Observable(observer=>{
// 假设this为上游Observable对象
// 订阅上游Observable对象时保留返回值在sub变量中
const sub = this.subscribe({
next: value => {
try{
// 把推送的数据做映射,然后传递给下游
observer.next(handler(value));
}catch(e){
// 出现错误时将错误传递给下游
observer.error(e);
}
},
error: err => observer.error(err),
complete: ()=>observer.complete()
});
return {
unsubscribe: ()=>{
// 当下游退订时,也退订上游的Observable对象,释放相关资源
sub.unsubscribe()
}
}
})
}
上面是操作符函数体部分如何编写,下面就要把这个函数和Observable关联起来
/*
* 1. 打补丁
* 函数声明部分不能使用ES6箭头函数语法,不然函数体中的this就不是Observable对象本身
* 缺点:打补丁的方式会影响到所有的Observable对象
*/
Observable.prototype.map = map;
/*
* 2. 使用bind,call或apply绑定Observable对象
* 可以让自定义操作符只对指定的Observable对象可用
* 缺点:没法用上链式调用
*/
const mapped$ = map.call(source$,x=>x*x);
3
使用bind和call的方法,Observable类本身不会被影响,但是每一个操作符的函数体内依然需要访问this。访问this的函数不是纯函数,所以,所有非纯函数的问题在这些操作符的实现中都可能出现。
/*
* 纯函数的操作符实现:
* map的实现部分看不到对this的访问,而是用一个参数obs$代替了this。
* 这样在数据管道中上游的Observable是以参数的形式传入,而不是靠this来获得,让map彻底成了一个纯函数
*/
function map(handler){
return function(obs$){
return new Observable(observer=>{
return obs$.subscribe({
next: value=> observer.next(handler(value)),
error: err=>observer.error(err),
complete: ()=>observer.complete()
})
})
}
}
map执行返回的结果是一个函数,接受一个Observable对象返回一个Observable对象。这样的纯函数操作符实现成为letable操作符,也成为pipeable操作。就是可以作为pipe操作符参数的操作符。
pipe是Observable类自带的一个操作符,任何Observable对象都支持pipe,无需导入模块。
const source$ = new Observable(onSubscribe).pipe(
map(x=>x*x)
);
RxJS从v5.5.0版本开始引入了letable操作符,大部分操作符都有pipeable操作符实现,注意是”大部分“而不是全部,这是因为:
- 静态类型操作符没有pipeable操作符的对应形式
- 拥有多个上游Observable对象的操作符没有pipeable操作符的对应形式
pipeable操作符导入路径和”打补丁“方式操作符有所区别,以map为例,用”打补丁“的方法,导入的是rxjs/add/operator/map.js
,而pipeable操作符导入的是rxjs/operators/map.js
(注意路径的operators是复数)
pipe还具有管道功能,可以把多个pipeable操作符串接起来,形成数据管道。
import {of} from 'rxjs/observable/of';
import {map, filter} from 'rxjs/operators';
/*
* 也可以这样导入
* import {map} from 'rxjs/operators/map.js';
* import {filter} from 'rxjs/operators/filter.js';
* 两种形式一样,Tree Shaking技术可以对lettable操作符产生删除死代码的效果
*/
const source$ = of(1,23);
const result$ = source$.pipe(
filter(x=>x%2===0),
map(x=>x*2)
)
result$.subscribe(console.log)
有四个操作符比较特殊,传统的操作符名称和pipeable操作符名称不同,它们是:
- do -> tap
- catch -> catchError
- switch -> switchAll
- finally -> finalize
这四个操作符的名称都是JavaScript的关键字,以打补丁的方式赋值为Observable.prototype的某个属性值没问题,但是不能作为变量或者函数的标识符出现。