一, 可观察对象(Observable)
可观察对象支持在应用中的发布者和订阅者之间传递消息.
可观察对象是声明式的 -- 也就是说, 发布者中用于发布值的函数, 只有在有 **** 消费者订阅 ** 它之后才会执行.
可观察对象可以发送多个任意类型的值 -- 字面量, 消息, 事件. 你的应用代码只管订阅并消费这些值就可以了, 做完之后, 取消订阅. 无论这个流是击键流, HTTP 响应流还是定时器, 对这些值进行监听和停止监听的接口都是一样的.
1.1 基本用法和词汇
作为发布者, 你创建一个可观察对象 (Observable) 的实例, 其中定义了一个订阅者 (subscriber) 函数. 订阅者函数用于定义 "如何获取或生成那些要发布的值或消息". 订阅者函数会接收一个 观察者(observer), 并把值发布给观察者的 next() 方法
当有消费者调用 subscribe() 方法时, 这个订阅者函数就会执行. 作为消费者, 要执行所创建的可观察对象, 并开始从中接收通知, 你就要调用可观察对象的 subscribe() 方法, 并传入一个观察者(observer).
观察者是一个 JavaScript 对象, 它定义了你收到的这些消息的处理器(handler).
subscribe() 调用会返回一个 Subscription 对象, 该对象具有一个 unsubscribe() 方法. 当调用该方法时, 你就会停止接收通知.
- // 在有消费者订阅它之前, 这个订阅者函数并不会实际执行
- const locations = new Observable((observer) => {
- const {next, error} = observer;
- let watchId;
- if ('geolocation' in navigator) {
- watchId = navigator.geolocation.watchPosition(next, error);
- } else {
- error('Geolocation not available');
- }
- return {unsubscribe() { navigator.geolocation.clearWatch(watchId); }};
- });
- // subscribe() 调用会返回一个 Subscription 对象, 该对象具有一个 unsubscribe() 方法.
- // subscribe()传入一个观察者对象, 定义了你收到的这些消息的处理器
- const locationsSubscription = locations.subscribe({
- next(position) { console.log('Current Position:', position); },
- error(msg) { console.log('Error Getting Location:', msg); }
- });
- // 10 seconds 后调用该方法时, 你就会停止接收通知.
- setTimeout(() => { locationsSubscription.unsubscribe(); }, 10000);
1.2 定义观察者 observer
通知类型 | 说明 |
---|---|
next | 必要 。用来处理每个送达值。在开始执行后可能执行零次或多次。 |
error | 可选 。用来处理 < strong ztid="153" ow="48" oh="17"> 错误通知 。错误会中断这个可观察对象实例的执行过程。 |
complete | 可选 。用来处理 < strong ztid="158" ow="36" oh="17"> 执行完 毕(complete)通知。当执行完毕后,这些值就会继续传给下一个处理器。 |
1.3 订阅
只有当有人订阅 Observable 的实例时, 订阅者函数才会开始发布值.
订阅时要先调用该实例的 subscribe() 方法, 并把一个观察者对象传给 subscribe(), 用来接收通知.
使用 Observable 上定义的一些静态方法来创建一些常用的简单可观察对象:
of(...items) -- 返回一个 Observable 实例, 它用同步的方式把参数 中提供的这些值发送出来.
from(iterable) -- 把它的参数转换成一个 Observable 实例. 该方法通常用于把一个数组转换成一个 (发送多个值的) 可观察对象.
下面的例子会创建并订阅一个简单的可观察对象, 它的观察者会把接收到的消息记录到控制台中:
- // 创建简单的可观察对象, 来发送 3 个值
- const myObservable = of(1, 2, 3);
- // 创建观察者对象
- const myObserver = {
- next: x => console.log('Observer got a next value:' + x),
- error: err => console.error('Observer got an error:' + err),
- complete: () => console.log('Observer got a complete notification'),
- };
- // 订阅
- myObservable.subscribe(myObserver);
- // Observer got a next value: 1
- // Observer got a next value: 2
- // Observer got a next value: 3
- // Observer got a complete notification
=>前面指定预定义观察者并订阅它, 等同如下写法, 省略了 next,error,complete
- myObservable.subscribe(
- // subscribe() 方法可以接收预定义在观察者中同一行的回调函数
- x => console.log('Observer got a next value:' + x),
- err => console.error('Observer got an error:' + err),
- () => console.log('Observer got a complete notification')
- );
无论哪种情况, next 处理器都是必要的, 而 error 和 complete 处理器是可选的.
注意, next() 函数可以接受消息字符串, 事件对象, 数字值或各种结构. 我们把由可观察对象发布出来的数据统称为流. 任何类型的值都可以表示为可观察对象, 而这些值会被发布为一个流.
1.4 创建可观察对象
要创建一个与前面的 of(1, 2, 3) 等价的可观察对象, 你可以这样做:
- // 订阅者函数会接收一个 Observer 对象, 并把值发布给观察者的 next() 方法.
- function sequenceSubscriber(observer) {
- // 同步地 发布 1, 2, and 3, 然后 complete
- observer.next(1);
- observer.next(2);
- observer.next(3);
- observer.complete();
- // 同步发布数据, 所以取消订阅 不需要做任何事情
- return {unsubscribe() {}};
- }
- // 使用 Observable 构造函数, 创建一个新的可观察对象,
- // 当执行可观察对象的 subscribe() 方法时, 这个构造函数就会把它接收到的参数 sequenceSubscriber 作为订阅者函数来运行.
- const sequence = new Observable(sequenceSubscriber);
- sequence.subscribe({
- next(num) { console.log(num); },
- complete() { console.log('Finished sequence'); }
- });
- // Logs:
- // 1
- // 2
- // 3
- // Finished sequence
下面的例子用来发布事件的可观察对象:
- function fromEvent(target, eventName) {
- return new Observable(
- // new Observable 中传入的订阅者函数是用内联方式定义的
- // 订阅者函数会接收一个 观察者对象 observer, 并把值 e 发布给观察者的 next() 方法
- (observer) => {
- const handler = (e) => observer.next(e);
- // Add the event handler to the target
- target.addEventListener(eventName, handler);
- return () => {
- // Detach the event handler from the target
- target.removeEventListener(eventName, handler);
- };
- }
- );
- }
- const ESC_KEY = 27;
- const nameInput = document.getElementById('name') as HTMLInputElement;
- const subscription = fromEvent(nameInput, 'keydown')// 使用 fromEvent 函数来创建可发布 keydown 事件的可观察对象
- .subscribe(
- // subscribe() 方法接收预定义在观察者中同一行的 next 回调函数
- (e: KeyboardEvent) => {
- if (e.keyCode === ESC_KEY) {
- nameInput.value = '';
- }
- }
- );
1.5 多播?
1.6 错误处理
由于可观察对象会异步生成值, 所以用 try/catch 是无法捕获错误的. 你应该在观察者中指定一个 error 回调来处理错误.
发生错误时还会导致可观察对象清理现有的订阅, 并且停止生成值.
可观察对象可以生成值 (subscribe() 调用 next 回调), 也可以调用 complete 或 error 回调来主动结束.
- myObservable.subscribe({
- next(num) { console.log('Next num:' + num)},
- error(err) { console.log('Received an errror:' + err)}
- });
二, RxJS 库
RxJS 是一个使用可观察对象进行响应式编程的库.
2.1 创建可观察对象的函数
RxJS 提供了一些用来创建可观察对象的函数. 这些函数可以简化根据某些东西创建可观察对象的过程, 比如承诺, 定时器, 事件, Ajax 等等.
承诺
- import { fromPromise } from 'rxjs';
- // Create an Observable out of a promise
- const data = fromPromise(fetch('/api/endpoint'));
- // Subscribe to begin listening for async result
- data.subscribe({
- next(response) { console.log(response); },
- error(err) { console.error('Error:' + err); },
- complete() { console.log('Completed'); }
- });
定时器
- import { interval } from 'rxjs';
- // Create an Observable that will publish a value on an interval
- const secondsCounter = interval(1000);
- // Subscribe to begin publishing values
- secondsCounter.subscribe(n =>
- console.log(`It's been ${n} seconds since subscribing!`));
事件
- import { fromEvent } from 'rxjs';
- const el = document.getElementById('my-element');
- // Create an Observable that will publish mouse movements
- const mouseMoves = fromEvent(el, 'mousemove');
- // Subscribe to start listening for mouse-move events
- const subscription = mouseMoves.subscribe((evt: MouseEvent) => {
- // Log coords of mouse movements
- console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);
- // When the mouse is over the upper-left of the screen,
- // unsubscribe to stop listening for mouse movements
- if (evt.clientX <40 && evt.clientY < 40) {
- subscription.unsubscribe();
- }
- });
- Ajax
- import {
- Ajax
- } from 'rxjs/ajax';
- // Create an Observable that will create an Ajax request
- const apiData = Ajax('/api/data');
- // Subscribe to create the request
- apiData.subscribe(res => console.log(res.status, res.response));
2.2 常用操作符
操作符会观察来源可观察对象中发出的值, 转换它们, 并返回由转换后的值组成的新的可观察对象.
提倡使用管道来组合操作符, 而不是使用链式写法
- import { filter, map } from 'rxjs/operators';
- const squareOdd = of(1, 2, 3, 4, 5) // 可观察对象
- .pipe(
- filter(n => n % 2 !== 0),
- map(n => n * n)
- );
- // Subscribe to get values
- squareOdd.subscribe(x => console.log(x));
类别 | 操作 |
---|---|
创建 | from, fromPromise,fromEvent, of |
组合 | combineLatest, concat, merge, startWith , withLatestFrom, zip |
过滤 | debounceTime, distinctUntilChanged, filter, take, takeUntil |
转换 | bufferTime, concatMap, map, mergeMap, scan, switchMap |
工具 | tap |
多播 | share |
操作符参考资料 http://rxmarbles.com/
Rxjs 常用操作符
2.3 错误处理
除了可以在订阅时提供 error() 处理器外, RxJS 还提供了 catchError 操作符, 它允许你在管道中处理已知错误. 下面是使用 catchError 操作符实现这种效果的例子:
- import { Ajax } from 'rxjs/ajax';
- import { map, catchError } from 'rxjs/operators';
- // Return "response" from the API. If an error happens,
- // return an empty array.
- const apiData = Ajax('/api/data').pipe(
- map(res => {
- if (!res.response) {
- throw new Error('Value expected!');
- }
- return res.response;
- }),
- // 如果你捕获这个错误并提供了一个默认值, 流就会继续处理这些值, 而不会报错.
- catchError(err => of([]))
- );
- apiData.subscribe({
- next(x) { console.log('data:', x); },
- error(err) { console.log('errors already caught... will not run'); }
- });
2.4 重试失败的可观察对象
可以在 catchError 之前使用 retry 操作符. 下列代码为前面的例子加上了捕获错误前重发请求的逻辑:
- import { Ajax } from 'rxjs/ajax';
- import { map, retry, catchError } from 'rxjs/operators';
- const apiData = Ajax('/api/data').pipe(
- retry(3), // Retry up to 3 times before failing
- map(res => {
- if (!res.response) {
- throw new Error('Value expected!');
- }
- return res.response;
- }),
- catchError(err => of([]))
- );
- apiData.subscribe({
- next(x) { console.log('data:', x); },
- error(err) { console.log('errors already caught... will not run'); }
- });
2.5 可观察对象的命名约定
习惯上的可观察对象的名字以 "$" 符号结尾.
stopwatchValue$: Observable<number>;
三, Angular 中的可观察对象
Angular 使用可观察对象作为处理各种常用异步操作的接口. 比如:
EventEmitter 类派生自 Observable.
HTTP 模块使用可观察对象来处理 Ajax 请求和响应.
路由器和表单模块使用可观察对象来监听对用户输入事件的响应.
3.1 事件发送器 EventEmitter
Angular 提供了一个 EventEmitter 类, 它用来从组件的 @Output() 属性中发布一些值. EventEmitter 扩展了 Observable, 并添加了一个 emit() 方法, 这样它就可以发送任意值了. 当你调用 emit() 时, 就会把所发送的值传给订阅上来的观察者的 next() 方法.
- @Output() open = new EventEmitter<any>();
- toggle() {
- this.open.emit(null);
- }
3.2HTTP
Angular 的 HttpClient 从 HTTP 方法调用中返回了可观察对象. 例如, http.get('/api') 就会返回可观察对象.
相对于基于承诺 (Promise) 的 HTTP API, 它有一系列优点:
可观察对象不会修改服务器的响应(和在承诺上串联起来的 .then() 调用一样). 反之, 你可以使用一系列操作符来按需转换这些值.
HTTP 请求是可以通过 unsubscribe() 方法来取消的.
请求可以进行配置, 以获取进度事件的变化.
失败的请求很容易重试.
3.3Async 管道
AsyncPipe 会订阅一个可观察对象或承诺, 并返回其发出的最后一个值. 当发出新值时, 该管道就会把这个组件标记为需要进行变更检查的
3.4 路由器 (router)
Router.events 以可观察对象的形式提供了其事件. 你可以使用 RxJS 中的 filter() 操作符来找到感兴趣的事件, 并且订阅它们, 以便根据浏览过程中产生的事件序列作出决定. 例子如下:
- import { Router, NavigationStart } from '@angular/router';
- import { filter } from 'rxjs/operators';
- @Component({
- selector: 'app-routable',
- templateUrl: './routable.component.html',
- styleUrls: ['./routable.component.css']
- })
- export class Routable1Component implements OnInit {
- navStart: Observable<NavigationStart>;
- constructor(private router: Router) {
- // Create a new Observable the publishes only the NavigationStart event
- this.navStart = router.events.pipe(
- filter(evt => evt instanceof NavigationStart)
- ) as Observable<NavigationStart>;
- }
- ngOnInit() {
- this.navStart.subscribe(evt => console.log('Navigation Started!'));
- }
- }
ActivatedRoute 是一个可注入的路由器服务, 它使用可观察对象来获取关于路由路径和路由参数的信息. 比如, ActivateRoute.url 包含一个用于汇报路由路径的可观察对象. 例子如下:
- import { ActivatedRoute } from '@angular/router';
- @Component({
- selector: 'app-routable',
- templateUrl: './routable.component.html',
- styleUrls: ['./routable.component.css']
- })
- export class Routable2Component implements OnInit {
- constructor(private activatedRoute: ActivatedRoute) {}
- ngOnInit() {
- this.activatedRoute.url
- .subscribe(url => console.log('The URL changed to:' + url));
- }
- }
3.5 响应式表单 (reactive forms)
响应式表单具有一些属性, 它们使用可观察对象来监听表单控件的值. FormControl 的 valueChanges 属性和 statusChanges 属性包含了会发出变更事件的可观察对象. 订阅可观察的表单控件属性是在组件类中触发应用逻辑的途径之一. 比如:
- import { FormGroup } from '@angular/forms';
- @Component({
- selector: 'my-component',
- template: 'MyComponent Template'
- })
- export class MyComponent implements OnInit {
- nameChangeLog: string[] = [];
- heroForm: FormGroup;
- ngOnInit() {
- this.logNameChange();
- }
- logNameChange() {
- const nameControl = this.heroForm.get('name');
- nameControl.valueChanges.subscribe(
- (value: string) => this.nameChangeLog.push(value)
- );
- }
- }
四, 可观察对象与其它技术的比较
4.1 可观察对象 vs. 承诺
可观察对象 | 承诺 | Observable 优势 |
---|---|---|
可观察对象是 < strong ztid="313" ow="36" oh="17"> 声明式 的,在被订阅之前,它不会开始执行。 | 承诺是在创建时就 < strong ztid="315" ow="120" oh="41"> 立即执行 的。 | 这让可观察对象可用于定义那些应该 < strong ztid="317" ow="48" oh="17"> 按需执行 的情景。 |
可观察对象能提供 < strong ztid="320" ow="36" oh="17"> 多个值 。 | 承诺只提供 < strong ztid="322" ow="24" oh="17"> 一个 。 | 这让可观察对象可用于随着时间的推移获取多个值。 |
可观察对象会区分串联处理和订阅语句。 | 承诺 < strong ztid="327" ow="24" oh="17"> 只有 .then() 语句。 | 这让可观察对象可用于创建供系统的其它部分使用而不希望立即执行的复杂菜谱。 |
可观察对象的 subscribe() 会负责处理错误。 | 承诺会把错误推送给它的 < strong ztid="332" ow="36" oh="17"> 子承诺 。 | 这让可观察对象可用于进行集中式、可预测的错误处理。 |
4.2 创建与订阅
在有消费者订阅之前, 可观察对象不会执行. subscribe() 会执行一次定义好的行为, 并且可以再次调用它. 重新订阅会导致重新计算这些值.
- content_copy
- // declare a publishing operation
- new Observable((observer) => { subscriber_fn });
- // initiate execution
- observable.subscribe(() => {
- // observer handles notifications
- });
承诺会立即执行, 并且只执行一次. 当承诺创建时, 会立即计算出结果. 没有办法重新做一次. 所有的 then 语句 (订阅) 都会共享同一次计算.
- content_copy
- // initiate execution
- new Promise((resolve, reject) => { executer_fn });
- // handle return value
- promise.then((value) => {
- // handle result here
- });
4.3 串联
可观察对象会区分各种转换函数, 比如映射和订阅. 只有订阅才会激活订阅者函数, 以开始计算那些值.
- content_copy
- observable.map((v) => 2*v);
承诺并不区分最后的 .then() 语句 (等价于订阅) 和中间的 .then() 语句(等价于映射).
- content_copy
- promise.then((v) => 2*v);
4.4 可取消
可观察对象的订阅是可取消的. 取消订阅会移除监听器, 使其不再接受将来的值, 并通知订阅者函数取消正在进行的工作.
- content_copy
- const sub = obs.subscribe(...);
- sub.unsubscribe();
承诺是不可取消的.
4.5 错误处理
可观察对象的错误处理是交给订阅者的错误处理器的, 并且该订阅者会自动取消对这个可观察对象的订阅.
- content_copy
- obs.subscribe(() => {
- throw Error('my error');
- });
承诺会把错误推给其子承诺.
- content_copy
- promise.then(() => {
- throw Error('my error');
- });
4.6 速查表
4.7 可观察对象 vs. 事件 API
4.8 可观察对象 vs. 数组
来源: https://juejin.im/post/5c171285e51d4512d945d8a3