理解 Subject

在上一篇博文中介绍了 Observable 和 Observer 的概念与用法。

它们是理解 Subject 的基础和前提。建议先行阅读。

1.什么是 Subject

在 RxJS 中 Subject 是混合体。它既是 Observable 又是 Observer

这意味着它既能像 Observable 一样生成数据流,被 Observer 所订阅,

也可以像 Observer 一样消费数据流。

2.为何使用 Subject

Subject 的特点在于多播( Multicast )。

这与 Observable 单播(Unicast)有很大差别。

单播意味着,每个 Observer 订阅的 Observable 对象都是不同的。

// Observable is unicast
const observable = Rx.Observable.create((observer) => {
    observer.next(Math.random());
});

// subscription 1
observable.subscribe((data) => {
  console.log('observableID:' + data); // 0.24957144215097515
});

// subscription 2
observable.subscribe((data) => {
   console.log('observableID:' + data); // 0.004617340049055896
});

而多播相反, 所有的 Observer 订阅的 Observable 对象都是相同的。

// Subject is multiicast
const subject = new Rx.Subject();

// subscriber 1
subject.subscribe((data) => {
  console.log('subjectID:' + data); // 0.24957144215097515 (random number)
});

// subscriber 2
subject.subscribe((data) => {
  console.log('subjectID:' + data); // 0.24957144215097515 (random number)
});

observable.subscribe(subject);

Subject 多播的特性可以让 Observer 共享数据变得更加容易。

因此,在这些场景下,考虑使用 Subject 是个不错的选择。

3.如何使用 Subject

3.1.简单例子

Subject 因为是 Observable 与 Observer 的混合体,因此使用方法也相近。

类似于 Observable,

Observer 可以利用 Subject 对象的 subscribe 方法对数据流进行订阅。

Subject 对象直接调用 next 方法就可以将值发送给订阅它的 Observers。

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

类似于 Observer,

Subject 本身也可以订阅其他 Observable 对象。

这样一来 Subject 本身的订阅者也会收到 Observable 对象发送的数据流。

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // You can subscribe providing a Subject

但是 Subject 的订阅者会忽略订阅之前的数据流。这在 3.2 节会有例子说明。

3.2.Replay Subject

Relay Subject 是 Subject 的一个变种。

一般的 Subject 订阅者会忽略订阅之前的数据流。

//----------------------------------------------------------------------
//normal subject
//output
//From subscription 1:after subscription1
//From subscription 1:after subscription2
//From subscription 2:after subscription2

const subject = new Rx.Subject();

subject.next('previous');

const subscription1 = subject.subscribe(x => {
  console.log('From subscription 1:', x);
});


subject.next('after subscription1');

const subscription2 = subject.subscribe(x => {
  console.log('From subscription 2:', x);
});

subject.next('after subscription2');

而 ReplaySubject 能够缓存最近的若干数据流(可以在初始化时指定)。

这样后继的 Observer 一旦订阅也能获得这些最近的数据。

//replay subject
//output
//From subscription 1:after subscription1
//From subscription 2:previous
//From subscription 2:after subscription1
//From subscription 1:after subscription2
//From subscription 2:after subscription2

const subject = new Rx.ReplaySubject(5);

subject.next('previous');

const subscription1 = subject.subscribe(x => {
  console.log('From subscription 1:', x);
});


subject.next('after subscription1');

const subscription2 = subject.subscribe(x => {
  console.log('From subscription 2:', x);
});

subject.next('after subscription2');

3.3.Behavior Subject

Behavior Subject 是 Subject 的一个变种。

和 Replay Subject 类似,它也可以缓存数据流。但是只是最近的一个。

并且通过指定初始值。即使目前没有任何数据流。

Observer 也能订阅到这个初始值。

//behavior subject
//output
//From subscription 1:previous
//From subscription 1:after subscription1
//From subscription 2:after subscription1
//From subscription 1:after subscription2
//From subscription 2:after subscription2

const subject = new Rx.BehaviorSubject('previous');

const subscription1 = subject.subscribe(x => {
  console.log('From subscription 1:', x);
});


subject.next('after subscription1');

const subscription2 = subject.subscribe(x => {
  console.log('From subscription 2:', x);
});

subject.next('after subscription2');

本文所有的示例代码都可以在线编辑查看。

4.陷阱

Subject 是一个非常强大的响应式对象。但是有细节需要注意:Subject 本身不缓存任何数据流。这意味着订阅之前next()出来的数据无法被捕捉到。

// No output as subject.next ahead of subject.subsribe()
const subject = new Rx.Subject();
subject.next('hello')
subject.subsribe(data => console.log(data))

Behavior Subject 和 Replay Subject 可以缓存最近发出的1次或几次的数据流,这样的对象在订阅的时候就可以接收到之前释放的数据流。

// Output hello
const subject = new Rx.BehaviorSubject();
subject.next('hello')
subject.subsribe(data => console.log(data))

参考文献

Understanding rxjs Subjects

On The Subject Of Subjects (in RxJS)

RxJS: Subjects, Behavior Subjects & Replay Subjects

RxJS 官方文档 Subject

ReactiveX 官方文档 Subject

推荐阅读

理解 Observable 与 Observer

发表评论

电子邮件地址不会被公开。 必填项已用*标注