理解 Observable 与 Observer

Observable,Observer 是 RxJS 编程中的核心,也是理解 Subject 的基础。

本文就是帮助你理解两者的区别与基本用法。

1.什么是 Observable

图片源自官方文档 —— Observable 一节。

可以看到,Observable 对象可以生成数据流,为 Observer 提供输入。

同时,Observable 对象通过可叠加多个转换(Transformation),对数据流进行转换,并且得到的仍是一个 Observable 对象。

2.什么是 Observer

和 Observable 提供数据流相反,

Observer 负责消费 Observable 提供的数据流。

Observer 通过实现三个回调函数 next,error,和 complete ,

来响应 Observable 的中的异步消息。

3.使用 Observable 和 Observer

利用 Observable 和 Observer 来处理数据流通常包含三个步骤,

分别为创建 Observable、订阅 Observable 和 取消订阅 Observable。

3.1.创建 Observable

var myObservable = Observable.create(function (observer) {
  setTimeout(() => {
    observer.next(1);
  }, 1000);

  setTimeout(() => {
    observer.next(2);
  }, 2000);

  setTimeout(() => {
    observer.next(3);
  }, 3000);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 4000);
});

create 方法只是 Observable 创建方式的其中一种,其他还有 of, from,interval 等方法。

同时如第 1 节所示,一些 RxJS 操作符也可以作用在 Observable 上,持续对数据流进行加工。

3.2.订阅 Observable

订阅 Observable 需要调用 Observable 对象的 subscribe 方法,并传入 observer 作为参数。

这里 subscribe 方法的返回值被成为 Subscription,即一个订阅。它只有一个方法 unsubscribe,用来取消订阅(如 3.3 节所示)。

var observer1 = {
  next: x => console.log('Observer1 got a next value: ' + x),
  error: err => console.error('Observer1 got an error: ' + err),
  complete: () => console.log('Observer1 got a complete notification'),
};

var subscription1 = myObservable.subscribe(observer1);

Observer 可以这样显式的定义,也可以直接在 subscribe 中定义。

var subscription2 = myObservable.subscribe({
  next: x => console.log('Observer2 got a next value: ' + x),
  error: err => console.error('Observer2 got an error: ' + err),
  complete: () => console.log('Observer2 got a complete notification'),
});

3.3.取消订阅 Observable

可以通过取消订阅的方式来结束对数据流的处理,可以释放系统运行资源。

  subscription1.unsubscribe();
  subscription2.unsubscribe(); 

最后,可以结合这里的在线例子加深对 Observable 与 Observer 的理解。

参考文献

RxJS 官方文档 Observable,Observer,Disposing Observable Executions

ReactiveX 官方文档 Observable

推荐阅读

理解 Subject

发表评论

电子邮件地址不会被公开。 必填项已用*标注