RxJS教程

RxJS教程

入门


基本概念:

  • Observable(可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer(观察者): 一个回调函数的集合,它知道如何去监听由Observable提供的值。
  • **Subscription(订阅):**表示Observable的执行,它主要用于取消Observable的执行。
  • Operator(操作符): 采用函数式编程风格的纯函数,使用像map、filter、concat、flatMap、等这样的操作符来处理集合。
  • Subject(主体): 相当于EventEmitter,并且是将值或事件多路推送给多个Observer的唯一方式。
  • Schdulers(调度器)::用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如setTimeout或requestAnimationFrame 或其他。

Observable(可观察对象)


Observables 是多个值的惰性推送集合。它填补了下面表格中的空白:

单个值 多个值
拉取 Function
推送 Promise

拉取(Pull)vs. 推送(Push)

拉取和推送是两种不同的协议,用来描述数据生产者如何与数据消费者进行通信的。

拉取? 由消费者来决定何时从生产者那接收数据,生产者本身不知道数据何时交付到消费者手中的。

每个Javascript函数都是拉取体系。函数式数据的生产者,调用该函数的代码通过从函数调用中取出一个单个返回值来对该函数进行消费。

生产者 消费者
拉取 被动的: 当被请求时产生数据。
推送 主动的: 按自己的节奏产生数据。

推送? 由生产者来决定何时吧数据发给消费者。消费者本身不知道何时后接受数据

Promise是最常见的推送体系类型。Promise(生产者) 将一个解析过的值传递给已注册的回调函数(消费者),但不同于函数的是,由 Promise 来决定何时把值“推送”给回调函数。

RxJS引入了Observables,一个新的javascript推送体系。Observable是多个值得生产者,并将值推送给观察者(消费者)

  • Function 是惰性的评估运算,调用时会同步地返回一个单一值
  • Generator 是惰性的评估运算,调用时会同步地返回零到无限多个值
  • Promise 是最终可能返回一个值得运算
  • Observable 是惰性评估运算,它可以从它被调用的时刻起或异步地返回零到无限多个值。

Observable 剖析

  • 创建Observable
  • 订阅Observable
  • 执行Observable
  • 清理Observable

创建Observable

RX.Observable.create 是Observable构造函数的别名,它接收一个参数subscribe函数。

// 生产者
var observable = RX.Observable.create(function subscribe(observer){
	var id = setInterval(()=>{
		observer.next('hi')
	},1000);
})
复制代码

Observable可以使用create来创建,但通常我们使用所谓的创建操作符,像of、from、interval、等等

订阅Observable

// 观察者
observable.subscribe(x=>console.log(x))
复制代码

这表明 subscribe 调用在同一 Observable 的多个观察者之间是不共享的.对 observable.subscribe 的每次调用都会触发针对给定观察者的独立设置。Observable 甚至不会去维护一个附加的观察者列表。

执行Observable

Observable.create(function subscribe(observer) {…}) 中…的代码表示 “Observable 执行”,它是惰性运算,只有在每个观察者订阅后才会执行。随着时间的推移,执行会以同步或异步的方式产生多个值。

Observable 执行可以传递三种类型的值:

  • “Next” 通知: 发送一个值,比如数字、字符串、对象,等等。
  • “Error” 通知: 发送一个 JavaScript 错误 或 异常。
  • “Complete” 通知: 不再发送任何值。

下面是 Observable 执行的示例,它发送了三个 “Next” 通知,然后是 “Complete” 通知:

var observable = Rx.Observable.create(function subscribe(observer) {
	observer.next(1);
	observer.next(2);
	observer.next(3);
	observer.complete();
});
复制代码

Observable 严格遵守自身的规约,所以下面的代码不会发送 “Next” 通知 4:

var observable = Rx.Observable.create(function subscribe(observer) {
	observer.next(1);
	observer.next(2);
	observer.next(3);
	observer.complete();
	observer.next(4); // 因为违反规约,所以不会发送
});
复制代码

在 subscribe 中用 try/catch 代码块来包裹任意代码是个不错的主意,如果捕获到异常的话,会发送 “Error” 通知:

var observable = Rx.Observable.create(function subscribe(observer) {
	try {
		observer.next(1);
		observer.next(2);
		observer.next(3);
		observer.complete();
	} catch (err) {
		observer.error(err); // 如果捕获到异常会发送一个错误
	}
});
复制代码

清理Observable

因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源。

当调用了 observable.subscribe ,观察者会被附加到新创建的 Observable 执行中。这个调用还返回一个对象,即 Subscription (订阅):

var subscription = observable.subscribe(x => console.log(x));
复制代码

Subscription 表示进行中的执行,它有最小化的 API 以允许你取消执行。想了解更多订阅相关的内容,请参见 Subscription 类型。使用 subscription.unsubscribe() 你可以取消进行中的执行:

var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// 稍后:
subscription.unsubscribe();
复制代码

当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用 unsubscribe() 方法就可以取消执行。

Observer (观察者)


什么是观察者? – 观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:next、error 和 complete 。下面的示例是一个典型的观察者对象:

var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};
observable.subscribe(observer);
复制代码

观察者只是有三个回调函数的对象,每个回调函数对应一种 Observable 发送的通知类型。

Subscription(订阅)


什么是 Subscription ? – Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe,它不需要任何参数,只是用来清理由 Subscription 占用的资源。在上一个版本的 RxJS 中,Subscription 叫做 “Disposable” (可清理对象)。

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// 稍后:
// 这会取消正在进行中的 Observable 执行
// Observable 执行是通过使用观察者调用 subscribe 方法启动的
subscription.unsubscribe();
//Subscription 还可以合在一起,这样一个 Subscription 调用 unsubscribe() 方法,可能会有多个 Subscription 取消订阅 。你可以通过把一个 Subscription 添加到另一个上面来做这件事:
var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
// subscription 和 childSubscription 都会取消订阅
subscription.unsubscribe();
}, 1000);
复制代码

Subscriptions 还有一个 remove(otherSubscription) 方法,用来撤销一个已添加的子 Subscription 。

Subject(主体)


什么是 Subject? – RxJS Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。

Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。

每个Subject都是Observable -对于Subject,你可以提供一个观察者并使用subscribe方法,就可以开始正常接收值。从观察者角度而言,它无法判断Observable执行来自普通的Observable还是Subject。

在 Subject 的内部,subscribe 不会调用发送值的新执行。它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 addListener 的工作方式。

每个 Subject 都是观察者。 -Subject 是一个有如下方法的对象: next(v)、error(e) 和 complete() 。要给 Subject 提供新值,只要调用 next(theValue),它会将值多播给已注册监听该 Subject 的观察者们。

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);
复制代码

因为 Subject 是观察者,这也就在意味着你可以把 Subject 作为参数传给任何 Observable 的 subscribe 方法,如下面的示例所展示的:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // 你可以提供一个 Subject 进行订阅

// 结果
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
复制代码

使用上面的方法,我们基本上只是通过 Subject 将单播的 Observable 执行转换为多播的。这也说明了 Subjects 是将任意 Observable 执行共享给多个观察者的唯一方式。

还有一些特殊类型的 Subject:BehaviorSubject、ReplaySubject 和 AsyncSubject。

多播的 Observables

“多播 Observable” 通过 Subject 来发送通知,这个 Subject 可能有多个订阅者,然而普通的 “单播 Observable” 只发送通知给单个观察者。

多播 Observable 在底层是通过使用 Subject 使得多个观察者可以看见同一个 Observable 执行。

在底层,这就是 multicast 操作符的工作原理:观察者订阅一个基础的 Subject,然后 Subject 订阅源 Observable 。下面的示例与前面使用 observable.subscribe(subject) 的示例类似:

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// 在底层使用了 `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// 在底层使用了 `source.subscribe(subject)`:
multicasted.connect();
复制代码

multicast 操作符返回一个 Observable,它看起来和普通的 Observable 没什么区别,但当订阅时就像是 Subject 。multicast 返回的是 ConnectableObservable,它只是一个有 connect() 方法的 Observable 。

connect() 方法十分重要,它决定了何时启动共享的 Observable 执行。因为 connect() 方法在底层执行了 source.subscribe(subject),所以它返回的是 Subscription,你可以取消订阅以取消共享的 Observable 执行。

引用计数

手动调用 connect() 并处理 Subscription 通常太笨重。通常,当第一个观察者到达时我们想要自动地连接,而当最后一个观察者取消订阅时我们想要自动地取消共享执行。

请考虑以下示例,下面的列表概述了 Subscriptions 发生的经过:

  1. 第一个观察者订阅了多播 Observable
  2. 多播 Observable 已连接
  3. next 值 0 发送给第一个观察者
  4. 第二个观察者订阅了多播 Observable
  5. next 值 1 发送给第一个观察者
  6. next 值 1 发送给第二个观察者
  7. 第一个观察者取消了多播 Observable 的订阅
  8. next 值 2 发送给第二个观察者
  9. 第二个观察者取消了多播 Observable 的订阅
  10. 多播 Observable 的连接已中断(底层进行的操作是取消订阅)

要实现这点,需要显式地调用 connect(),代码如下:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
// 这里我们应该调用 `connect()`,因为 `multicasted` 的第一个
// 订阅者关心消费值
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);

// 这里我们应该取消共享的 Observable 执行的订阅,
// 因为此后 `multicasted` 将不再有订阅者
setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); // 用于共享的 Observable 执行
}, 2000);
复制代码

如果不想显式调用 connect(),我们可以使用 ConnectableObservable 的 refCount() 方法(引用计数),这个方法返回 Observable,这个 Observable 会追踪有多少个订阅者。当订阅者的数量从0变成1,它会调用 connect() 以开启共享的执行。当订阅者数量从1变成0时,它会完全取消订阅,停止进一步的执行。

refCount 的作用是,当有第一个订阅者时,多播 Observable 会自动地启动执行,而当最后一个订阅者离开时,多播 Observable 会自动地停止执行。

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

// 这里其实调用了 `connect()`,
// 因为 `refCounted` 有了第一个订阅者
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

// 这里共享的 Observable 执行会停止,
// 因为此后 `refCounted` 将不再有订阅者
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

// 执行结果:
observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed
复制代码

refCount() 只存在于 ConnectableObservable,它返回的是 Observable,而不是另一个 ConnectableObservable 。

BehaviorSubject

Subject 的其中一个变体就是 BehaviorSubject,它有一个“当前值”的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”。

BehaviorSubjects 适合用来表示“随时间推移的值”。举例来说,生日的流是一个 Subject,但年龄的流应该是一个 BehaviorSubject 。

在下面的示例中,BehaviorSubject 使用值0进行初始化,当第一个观察者订阅时会得到0。第二个观察者订阅时会得到值2,尽管它是在值2发送之后订阅的。

var subject = new Rx.BehaviorSubject(0); // 0是初始值

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);
复制代码

输出:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
复制代码

ReplaySubject

ReplaySubject 类似于 BehaviorSubject,它可以发送旧值给新的订阅者,但它还可以记录 Observable 执行的一部分。

ReplaySubject 记录 Observable 执行中的多个值并将其回放给新的订阅者。

var subject = new Rx.ReplaySubject(3); // 为新的订阅者缓冲3个值

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5); 
复制代码

除了缓冲数量,你还可以指定 window time (以毫秒为单位)来确定多久之前的值可以记录。在下面的示例中,我们使用了较大的缓存数量100,但 window time 参数只设置了500毫秒。

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

// 输出:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...
复制代码

AsyncSubject

AsyncSubject 是另一个 Subject 变体,只有当 Observable 执行完成时(执行 complete()),它才会将执行的最后一个值发送给观察者。

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();
复制代码

输出:

observerA: 5
observerB: 5
复制代码

AsyncSubject 和 last() 操作符类似,因为它也是等待 complete 通知,以发送一个单个值。

Operators (操作符)

尽管 RxJS 的根基是 Observable,但最有用的还是它的操作符。操作符是允许复杂的异步代码以声明式的方式进行轻松组合的基础代码单元。

什么是操作符?

操作符是 Observable 类型上的方法,比如 .map(…)、.filter(…)、.merge(…),等等。当操作符被调用时,它们不会改变已经存在的 Observable 实例。相反,它们返回一个新的 Observable ,它的 subscription 逻辑基于第一个 Observable 。

操作符是函数,它基于当前的 Observable 创建一个新的 Observable。这是一个无副作用的操作:前面的 Observable 保持不变。

操作符本质上是一个纯函数 (pure function),它接收一个 Observable 作为输入,并生成一个新的 Observable 作为输出。订阅输出 Observable 同样会订阅输入 Observable 。在下面的示例中,我们创建一个自定义操作符函数,它将从输入 Observable 接收的每个值都乘以10:

function multiplyByTen(input) {
  var output = Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
  return output;
}

var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));
复制代码

输出:

10
20
30
40
复制代码

注意,订阅 output 会导致 input Observable 也被订阅。我们称之为“操作符订阅链”。

实例操作符 vs. 静态操作符

什么是实例操作符? – 通常提到操作符时,我们指的是实例操作符,它是 Observable 实例上的方法。举例来说,如果上面的 multiplyByTen 是官方提供的实例操作符,它看起来大致是这个样子的:

Rx.Observable.prototype.multiplyByTen = function multiplyByTen() {
  var input = this;
  return Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
}
复制代码

实例运算符是使用 this 关键字来指代输入的 Observable 的函数。

注意,这里的 input Observable 不再是一个函数参数,它现在是 this 对象。下面是我们如何使用这样的实例运算符:

var observable = Rx.Observable.from([1, 2, 3, 4]).multiplyByTen();

observable.subscribe(x => console.log(x));
复制代码

什么是静态操作符? – 除了实例操作符,还有静态操作符,它们是直接附加到 Observable 类上的。静态操作符在内部不使用 this 关键字,而是完全依赖于它的参数。

静态操作符是附加到 Observalbe 类上的纯函数,通常用来从头开始创建 Observalbe 。

最常用的静态操作符类型是所谓的创建操作符。它们只接收非 Observable 参数,比如数字,然后创建一个新的 Observable ,而不是将一个输入 Observable 转换为输出 Observable 。

一个典型的静态操作符例子就是 interval 函数。它接收一个数字(非 Observable)作为参数,并生产一个 Observable 作为输出:

var observable = Rx.Observable.interval(1000 /* 毫秒数 */);
复制代码

创建操作符的另一个例子就是 create,已经在前面的示例中广泛使用。点击这里查看所有静态操作符列表。

然而,有些静态操作符可能不同于简单的创建。一些组合操作符可能是静态的,比如 merge、combineLatest、concat,等等。这些作为静态运算符是有道理的,因为它们将多个 Observables 作为输入,而不仅仅是一个,例如:

var observable1 = Rx.Observable.interval(1000);
var observable2 = Rx.Observable.interval(400);

var merged = Rx.Observable.merge(observable1, observable2);
复制代码

要解释操作符是如何工作的,文字描述通常是不足以描述清楚的。许多操作符都是跟时间相关的,它们可能会以不同的方式延迟(delay)、取样(sample)、节流(throttle)或去抖动值(debonce)。图表通常是更适合的工具。弹珠图是操作符运行方式的视觉表示,其中包含输入 Obserable(s) (输入可能是多个 Observable )、操作符及其参数和输出 Observable 。

在弹珠图中,时间流向右边,图描述了在 Observable 执行中值(“弹珠”)是如何发出的。

在下图中可以看到解剖过的弹珠图。

在整个文档站中,我们广泛地使用弹珠图来解释操作符的工作方式。它们在其他环境中也可能非常有用,例如在白板上,甚至在我们的单元测试中(如 ASCII 图)。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/101553.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号