Description
定义:所谓高阶Observable指的是产生的数据依然是Observable的Observable。
一个高阶Observable的例子:
const ho$ = interval(1000)
.take(2)
.map(x=>interval(1500).map(y=>`${x}: ${y}`).take(2));
// ho$是一个高阶Observable对象,interval产生的两个数据0,1通过map把0和1映射为新的Observable对象
因为高阶Observable实际上涉及多个数据流,所以会展示多条横轴,最上面的一条横轴代表的就是高阶Observable本身,下面的多条横轴代表的就是高阶Observable的某个Observable形式的具体数据。高阶Observable完结不代表内部Observable完结,因为作为独立的Observable,它们有自己的生命周期
高阶Observable的意义
数据流虽然管理的是数据,数据流自身也可以认为是一种数据,既然数据可以用Observable管理,那么数据流也可以用Observable来管理,让需要被管理的Observable对象成为其他Observable对象的数据,用现成的管理Observable对象的方法来管理Observable对象,这就是高阶Observable的意义。高阶Observable对象的本质是用管理数据的方式来管理多个Observable对象。
操作高阶Observable的合并类操作符
处理高阶Observable的合并类操作符,名称就是在基本的合并类操作符(包括concat,merge,zip,combineLatest)名称的结尾加上All,如下所示:
- concatAll
- mergeAll
- zipAll
- combineAll(这个是个例外,因为combineLatestAll显得有点啰嗦)
All代表“全部”,这些操作符的功能有差异,但都是把一个高阶Observable的所有内部Observable都组合起来,所有这类操作符全部都只有实例操作符的形式。
concatAll
concatAll只有一个上游Observable对象,这个Observable对象预期是一个高阶Observable对象,concatAll会对其中的内部Observable对象做concat的操作。
const ho$ = interval(1000)
.take(2)
.map(x=>interval(1500).map(y=>`${x}: ${y}`).take(2));
const concated$ = ho$.concatAll();
// 可以看到concatAll没有任何参数,一切输入都来自上游的Observable对象。
/*
输出结果
0:0
0:1
1:0
1:1
complete
*/
concatAll首先会订阅上游产生的第一个内部Observable对象,抽取其中的数据,然后,只有当第一个Observable对象完结的时候才会去订阅第二个内部Observable对象。下图的弹珠图展示了完整过程:
和concat一样,如果前一个Observable没有完结,那么concatAll就不会订阅下一个内部Observable对象,这导致一个问题,如果上游的高阶Observable对象持续不断产生Observable对象但是这些Observable对象又异步产生数据,以至于concatAll合并的速度赶不上上游产生新的Observable对象的速度,这就会造成Observable的积压。
mergeAll
const ho$ = interval(1000)
.take(2)
.map(x=>interval(1500).map(y=>`${x}: ${y}`).take(2));
const merged$ = ho$.mergeAll();
/*
输出结果
0:0
1:0
0:1
1:1
complete
*/
上面的代码产生的弹珠图如下所示:
mergeAll对内部Observable的订阅策略和concatAll不同,mergeAll只要发现上游产生一个内部Observable就会立刻订阅,并从中抽取数据。所以,第二个内部Observable产生的数据1:0会在第一个内部Observable产生的数据0:1之前。
zipAll
const ho$ = interval(1000)
.take(2)
.map(x=>interval(1500).map(y=>`${x}: ${y}`).take(2));
const zipped$ = ho$.zipAll();
/*
输出结果
[ '0:0' , '1:0' ]
[ '0:1' , '1:1' ]
complete
*/
如果上游的高阶Observable没有完结,那么zipAll就不会开始工作
combineAll
const ho$ = interval(1000)
.take(2)
.map(x=>interval(1500).map(y=>`${x}: ${y}`).take(2));
const combined$ = ho$.combineAll();
/*
输出结果
[ '0:0' , '1:0' ]
[ '0:1' , '1:0' ]
[ '0:1' , '1:1' ]
complete
*/
combineAll和zipAll一样,必须上游高阶Observable完结之后才能开始给下游产生数据,因为只有确定了作为输入的内部Observable对象的个数,才能拼凑出第一个传给下游的数据
concat对于concatAll,merge对于mergeAll,zip对于zipAll,combineLatest对于combineAll,主要的区别只是作为输入的Observable对象的形式变化。不带All的操作符输入Observable对象是以操作符调用主体对象或者函数参数形式出现,带All的操作符输入Observable对象是以上游高阶Observable对象产生的内部Observable对象形式出现。
进化的高阶Observable处理
上面介绍过concatAll存在一个问题,当上游高阶Observable产生Observable对象的速度过快,快过内部Observable产生数据的速度,因为concatAll要做无损的数据流连接,就会造成数据积压。实际上,很多场景下并不需要无损的数据流连接,也就是说可以舍弃掉一些数据,至于怎么舍弃就涉及另外两个合并类操作符,分别是switch和exhaust,这两个操作符是concatAll的进化版本。
switch
switch的含义就是“切换”,总是切换到最新的内部Observable对象获取数据。每个switch的上游高阶Observable对象产生一个内部Observable对象,switch都会立刻订阅最新的内部Observable,如果已经订阅了之前的内部Observable对象,就会退订那个过时的内部Observable对象。
const ho$ = interval(1000)
.take(3)
.map(x=>interval(700).map(y=>`${x}: ${y}`).take(2));
const concated$ = ho$.switch();
/*
输出结果:
0: 0
1: 0
2: 0
2: 1
Complete!
*/
上面代码对应的弹珠图:
值得注意的是switch产生的Observable对象完结基于两个条件:上游高阶Observable已经完结和当前内部Observable已经完结。只满足其中一个条件,并不会让switch产生的Observable对象完结。
exhaust
exhaust的含义就是“耗尽”,这个操作符的意思是在耗尽当前内部Observable的数据之前不会切换到下一个内部Observable对象。exhaust的策略和switch相反,当内部Observable对象在时间上发生重叠时,情景就是前一个内部Observable还没有完结,而新的Observable又已经产生,到底应该选择哪个作为数据源?switch选择新产生的内部Observable对象,exhaust则选择前一个内部Observable对象。
const ho$ = interval(1000)
.take(3)
.map(x=>interval(700).map(y=>`${x}: ${y}`).take(2));
const result$ = ho$.exhaust();
/*
输出结果:
0: 0
0: 1
2: 0
2: 1
Complete!
*/
第二个内部Observable对象生不逢时,当它产生的时候第一个内部Observable对象还没有完结,这个时候exhaust会直接忽略第二个Observable对象,甚至不会去订阅它;第三个内部Observable对象会被订阅并提取数据,是因为在它出现之前,第一个内部Observable对象已经完结了。
和switch一样,exhaust产生的Observable对象完结前提是,最新的内部Observable对象完结而且上游高阶Observable对象完结。