Skip to content

RxJS-Subject(主体) #2

Open
Open
@isNeilLin

Description

@isNeilLin

RxJS-Subject(主体)

Subject是一种特殊类型的Observable,它允许将值多播给多个观察者。所以Subject是多播的,而普通的Observables是单播的(每个已订阅的观察者都拥有Observable的独立执行)。

每个Subject都是Observable  对于 Subject,可以提供一个观察者并使用 subscribe 方法,就可以开始正常接收值。从观察者的角度而言,它无法判断 Observable 执行是来自普通的 Observable 还是 Subject。

每个Subject都是观察者 Subject是一个拥有如下方法的对象:next(), error(), complete()。要给Subject提供新值,只要调用next(value),它会将值多播给已注册监听该subject的观察者们。

示例

var subject = new Rx.Subject();
subject.subscribe({
	next: v => {
		console.log('observerA: '+v)
	}
})
subject.subscribe({
	next: v => {
		console.log('observerB: '+v)
	}
})
subject.next(1);
subject.next(2);

控制台输出

observerA: 1
observerB: 1
observerA: 2
observerB: 2

因为subject是观察者,所以可以将subject作为参数传递给任何Observable的subscribe方法

var subject = new Rx.Subject();

subject.subscribe({
	next: v => {
		console.log('observerA: '+v)
	}
})
subject.subscribe({
	next: v => {
		console.log('observerB: '+v)
	}
})
var observable = Rx.Observable.from([1,2,3]);
observable.subscribe(subject); // 可以提供一个 Subject 进行订阅

控制台输出

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

上面的方法通过subject将单播的Observable执行转换为多播的。Subjects也是将任意Observable执行共享给多个观察者的唯一方式

多播的Observables

“多播的Observable”通过subject来发送通知,这个subject可能有多个订阅者,然而普通的Observable只能发送给单个观察者。
多播Observable在底层是通过使用subject使得多个观察者可以看见同一个Observable执行

在底层,则就是multicast操作符的工作原理:观察者订阅一个基础的subject,然后subject订阅源Observable

var source = Rx.Observable.from([1,2,3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

multicasted.subscribe({
	next: v => {
		console.log('observerA: '+ v)
	}
})
multicasted.subscribe({
	next: v => {
		console.log('observerB: '+ v)
	}
})
multicasted.connect();

multicast操作符返回一个observable,它看起来和普通的 Observable 没什么区别,但当订阅时就像是 Subject 。multicast返回的是ConnectableObservable,它只是一个有connect方法的observable。
connect方法决定了何时启动共享的observable执行。它返回的是subscription,可以取消订阅以取消共享的 Observable 执行。

引用计数

如果不想显式调用connect(),可以使用ConnectableObservable,的refCount()方法(引用计数),这个方法返回Observable。
refCount的作用是当有第一个订阅者时,多播 Observable 会自动地启动执行。而当最后一个订阅者离开时,多播 Observable 会自动地停止执行。

BehaviorSubject

Subject的其中一个变体就是BehaviorSubject,它有一个当前值的概念,它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从BehaviorSubject那里接收到当前值。

在下面的示例中,BehaviorSubject 使用值0进行初始化,当第一个观察者订阅时会得到0。第二个观察者订阅时会得到值2,尽管它是在值2发送之后订阅的。

var subject = new Rx.BehaviorSubject(0); // 0是初始值

subject.subscribe({
	next: v => {
		console.log(v)
	}
})

subject.next(1)
subject.next(2)

subject.subscribe({
	next: v => {
		console.log(v)
	}
})
subject.next(3)

输出:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject

ReplaySubject 类似于 BehaviorSubject,它可以发送旧值给新的订阅者,但它还可以记录 Observable 执行的一部分。ReplaySubject 记录 Observable 执行中的多个值并将其回放给新的订阅者。

当创建ReplaySubject时,可以指定回放多少个值。除了缓冲数量,还可以指定 window time (以毫秒为单位)来确定多久之前的值可以记录。

var subject = new Rx.ReplaySubject(3); // 为新的订阅者缓冲3个值

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

输出:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

AsyncSubject

AsyncSubject是另一个Subject变体,只有当Observable执行完成(执行complete())时,它才会将执行的最后一个值发送给观察者

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

输出:

observerA: 5
observerB: 5

Metadata

Metadata

Assignees

No one assigned

    Labels

    框架&工具库Vue和React等前端框架或工具库相关

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions