Description
RxJS-Subject(主体)
Subject是一种特殊类型的Observable,它允许将值多播给多个观察者。所以Subject是多播的,而普通的Observables是单播的(每个已订阅的观察者都拥有Observable的独立执行)。
每个Subject都是Observable 对于 Subject,可以提供一个观察者并使用 subscribe
方法,就可以开始正常接收值。从观察者的角度而言,它无法判断 Observable 执行是来自普通的 Observable 还是 Subject。
每个Subject都是观察者 Subject是一个拥有如下方法的对象:next()
, error()
, complete()
。要给Subject提供新值,只要调用next(value)
,它会将值多播给已注册监听该subject的观察者们。
示例
var subject = new Rx.Subject();
subject.subscribe({
next: v => {
console.log('observerA: '+v)
}
})
subject.subscribe({
next: v => {
console.log('observerB: '+v)
}
})
subject.next(1);
subject.next(2);
控制台输出
observerA: 1
observerB: 1
observerA: 2
observerB: 2
因为subject是观察者,所以可以将subject作为参数传递给任何Observable的subscribe
方法
var subject = new Rx.Subject();
subject.subscribe({
next: v => {
console.log('observerA: '+v)
}
})
subject.subscribe({
next: v => {
console.log('observerB: '+v)
}
})
var observable = Rx.Observable.from([1,2,3]);
observable.subscribe(subject); // 可以提供一个 Subject 进行订阅
控制台输出
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
上面的方法通过subject将单播的Observable执行转换为多播的。Subjects也是将任意Observable执行共享给多个观察者的唯一方式
多播的Observables
“多播的Observable”通过subject来发送通知,这个subject可能有多个订阅者,然而普通的Observable只能发送给单个观察者。
多播Observable在底层是通过使用subject使得多个观察者可以看见同一个Observable执行
在底层,则就是multicast
操作符的工作原理:观察者订阅一个基础的subject,然后subject订阅源Observable
var source = Rx.Observable.from([1,2,3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
multicasted.subscribe({
next: v => {
console.log('observerA: '+ v)
}
})
multicasted.subscribe({
next: v => {
console.log('observerB: '+ v)
}
})
multicasted.connect();
multicast
操作符返回一个observable,它看起来和普通的 Observable 没什么区别,但当订阅时就像是 Subject 。multicast返回的是ConnectableObservable
,它只是一个有connect
方法的observable。
connect
方法决定了何时启动共享的observable执行。它返回的是subscription
,可以取消订阅以取消共享的 Observable 执行。
引用计数
如果不想显式调用connect()
,可以使用ConnectableObservable
,的refCount()
方法(引用计数),这个方法返回Observable。
refCount的作用是当有第一个订阅者时,多播 Observable 会自动地启动执行。而当最后一个订阅者离开时,多播 Observable 会自动地停止执行。
BehaviorSubject
Subject的其中一个变体就是BehaviorSubject
,它有一个当前值的概念,它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从BehaviorSubject
那里接收到当前值。
在下面的示例中,BehaviorSubject 使用值0进行初始化,当第一个观察者订阅时会得到0。第二个观察者订阅时会得到值2,尽管它是在值2发送之后订阅的。
var subject = new Rx.BehaviorSubject(0); // 0是初始值
subject.subscribe({
next: v => {
console.log(v)
}
})
subject.next(1)
subject.next(2)
subject.subscribe({
next: v => {
console.log(v)
}
})
subject.next(3)
输出:
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
ReplaySubject
ReplaySubject
类似于 BehaviorSubject
,它可以发送旧值给新的订阅者,但它还可以记录 Observable 执行的一部分。ReplaySubject 记录 Observable 执行中的多个值并将其回放给新的订阅者。
当创建ReplaySubject时,可以指定回放多少个值。除了缓冲数量,还可以指定 window time (以毫秒为单位)来确定多久之前的值可以记录。
var subject = new Rx.ReplaySubject(3); // 为新的订阅者缓冲3个值
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
输出:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
AsyncSubject
AsyncSubject
是另一个Subject变体,只有当Observable执行完成(执行complete()
)时,它才会将执行的最后一个值发送给观察者
var subject = new Rx.AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
输出:
observerA: 5
observerB: 5