上一篇博客提到了几种响应式的方案, 以及它们的缺点. 本文将介绍 Observable 以及它的一个实现, 以及它在处理响应式时相对于上篇博客中的方案的巨大优势(推荐两篇博客对比阅读).
Observable 是一个集合了观察者模式, 迭代器模式和函数式的库, 提供了基于事件流的强大的异步处理能力, 并且已在 Stage 1 草案中. 本文介绍的 Rxjs 是 Observable 的一个实现, 它是 ReactiveX http://reactivex.io/ 众多语言中的 JavaScript 版本.
在 JavaScript 中, 我们可以使用 T | null 去处理一个单值, 使用 Iterator 去处理多个值得情况, 使用 Promise 处理异步的单个值, 而 Observable 则填补了缺失的 "异步多个值".
单个值 | 多个值 | |
---|---|---|
同步 | T | null | Iterator<T> |
异步 | Promise<T> | Observable<T> |
使用 Rxjs
上文提到使用 Event Emitter 做响应式处理, 在 Rxjs 中稍有些不同:
- /*
- const change$ = new Subject();
- <Input change$={change$} />
- <Search change$={change$} />
- */
- class Input extends Component {
- state = {
- value: ''
- };
- onChange = e => {
- this.props.change$.next(e.target.value);
- };
- componentDidMount() {
- this.subscription = this.props.change$.subscribe(value => {
- this.setState({
- value
- });
- });
- }
- componentWillUnmount() {
- this.subscription.ubsubscribe();
- }
- render() {
- const { value } = this.state;
- return <input value={value} onChange={this.onChange} />;
- }
- }
- class Search extends Component {
- // ...
- componentDidMount() {
- this.subscription = this.props.change$.subscribe(value => {
- ajax(/* ... */).then(list =>
- this.setState({
- list
- })
- );
- });
- }
- componentWillUnmount() {
- this.subscription.ubsubscribe();
- }
- render() {
- const { list } = this.state;
- return <ul>{list.map(item => <li key={item.id}>{item.value}</li>)}</ul>;
- }
- }
在这里, 我们虽然也需要手动释放对事件的订阅, 但是得益于 Rxjs 的设计, 我们不需要像 Event Emitter 那样去存下回调函数的实例, 用于释放订阅, 因此我们很容易就可以通过高阶组件解决这个问题. 例如:
- const withObservables = observables => ChildComponent => {
- return class extends Component {
- constructor(props) {
- super(props);
- this.subscriptions = {};
- this.state = {};
- Object.keys(observables).forEach(key => {
- this.subscriptions[key] = observables[key].subscribe(value => {
- this.setState({
- [key]: value
- });
- });
- });
- }
- onNext = (key, value) => {
- observables[key].next(value);
- };
- componentWillUnmount() {
- Object.keys(this.subscriptions).forEach(key => {
- this.subscriptions[key].unsubscribe();
- });
- }
- render() {
- return (
- <ChildComponent {...this.props} {...this.state} onNext={this.onNext} />
- );
- }
- };
- };
这样在需要聚合多个数据源时, 也不会像 Event Emitter 那样手动释放资源造成麻烦. 同时, 在 Rxjs 中我们还有专用于聚合数据源的方法:
- Observable.combineLatest(foo$, bar$)
- .pipe(
- // ...
- );
显然相对于 Event Emitter 的方式十分高效, 同时它相对于 Mobx 也有巨大的优势. 在 Mobx 中, 我们提到需要聚合多个数据源的时候, 采用 autoRun 的方式容易收集到不必要的依赖, 使用 observe 则不够高效. 在 Rxjs 中, 显然不会有这些问题, combineLatest 可以以很简练的方式声明需要聚合的数据源, 同时, 得益于 Rxjs 设计, 我们不需要像 Mobx 一个一个去调用 observe 返回的析构, 只需要处理每一个 subscribe 返回的 subscription:
- class Foo extends Component {
- constructor(props) {
- super(props);
- this.subscription = Observable.combineLatest(foo$, bar$)
- .pipe(
- // ...
- )
- .subscribe(() => {
- // ...
- });
- }
- componentWillUnmount() {
- this.subscription.unsubscribe();
- }
- }
异步处理
Rxjs 使用操作符去描述各种行为, 每一个操作符会返回一个新的 Observable, 我们可以对它进行后续的操作. 例如, 使用 map 操作符就可以实现对数据的转换:
foo$.map(event => event.target.value);
Rxjs 5.5 之后所有的 Observable 上都引入了一个 pipe 方法, 接收若干个操作符, pipe 方法会返回一个 Observable. 因此, 我们可以很容易配合 tree shaking 实现对操作符的按需引入, 而不是把整个 Rxjs 引入进来:
- import { map } from 'rxjs/operators';
- foo$.pipe(map(event => event.target.value));
推荐使用这种写法.
在讨论面向对象的响应式的响应式中, 我们提到对于异步的问题, 面向对象的方式不好处理. 在 Observable 中我们可以通过 switchMap 操作符处理异步问题, 一个异步搜索看起来会是这样:
input$.pipe(switchMap(keyword => Observable.ajax(/* ... */)));
在处理异步单值时, 我们可以使用 Promise, 而 Observable 用于处理异步多个值, 我们可以很容易把一个 Promise 转成一个 Observable, 从而复用已有的异步代码:
input$.pipe(switchMap(keyword => fromPromise(search(/* ... */))));
switchMap 接受一个返回 Observable 的函数作为参数, 下游的流就会切到这个返回的 Observable. 而要聚合多个数据源并做异步处理时:
- combineLatest(foo$, bar$).pipe(
- switchMap(keyword => fromPromise(someAsyncOperation(/* ... */)))
- );
同时, 由于标准制定的 Promise 是没有 cancel 方法的, 有时候我们要取消异步方法的时候就有些麻烦(主要是为了解决一些并发安全问题).switchMap 当上游有新值到来时, 会忽略结束已有未完成的 Observable 然后调用函数返回一个新的 Observable, 我们只使用一个函数就解决了并发安全问题. 当然, 我们可以根据实际需要选用 switchMap,mergeMap,concatMap,exhaustMap 等.
而对于时间轴的操作, Rxjs 也有巨大优势. 上篇博客中提到当我们需要延时 5 秒做操作时, 无论是 Event Emitter 还是面向对象的方式都力不从心, 而在 Rxjs 中我们只需要一个 delay 操作符即可解决问题:
- input$.pipe(
- delay(5000) // 下游会在 input$ 值到来后 5 秒才接到数据
- );
用 Rxjs 处理数据
在实际开发过程中, 事件不能解决所有问题, 我们往往会需要存储数据, 而 Observable 被设计成用于处理事件, 因此它有很多符合事件直觉的设计.
Observable 被设计为懒 (lazy) 的, 当当没有订阅者时, 一个流不会执行. 对于事件而言, 没有事件的消费者那么不执行也不会有问题. 而在 GUI 中, 订阅者可能是 View:
- class View extends Component {
- state = {
- input: ''
- };
- componentDidMount() {
- this.subscription = input$.subscribe(input => {
- this.setState({
- input
- });
- });
- }
- componentWillUnmount() {
- this.subscription.unsubscribe();
- }
- render() {
- // ...
- }
- }
由于这个 View 可能不存在, 例如路由被切走了, 那么我们的事件源就没有了订阅者, 他就不会运行. 但是我们希望在路由被且走后, 后台的数据依然会继续.
对于事件而言, 在事件发生之后的订阅者不会受到订阅之前的逻辑. 例如在 EventEmitter 中:
- eventEmitter.emit('hello', 1);
- // ...
- eventEmitter.on('hello', function listener() {});
由于 listener 是在 hello 事件发生后在监听的, 不会收到值为 1 的事件. 但是这在处理数据的时候会造成麻烦, 我们的数据在 View 被卸载 (例如路由切走) 后丢失.
同时, 由于 Observable 没有提供直接取到内部状态的方法, 当我们使用 Observable 处理数据时, 我们不方便随时拿到数据. 那有办法解决这个问题, 从而使 Observable 强大抽象能力去赋能数据层呢?
回到 Redux.Redux 的事件 (Action) 其实是一个事件流, 那么我们就可以很自然地把 Redux 的事件流融入到 Rxjs 流中:
- () => next => {
- const action$ = new Subject();
- return action => {
- action$.next(action);
- // ...
- };
- };
通过这样的封装, https://redux-observable.js.org 就能让我们把 Observable 强大的事件描述和处理能力和 Redux 结合. 我们可以非常方便地根据 Action 去处理副作用:
- action$.pipe(
- ofType('ACTION_1'),
- switchMap(() => {
- // ...
- }),
- map(res => ({
- type: 'ACTION_2',
- payload: res
- }))
- );
- action$.pipe(
- ofType('ACTION_3'),
- mergeMap(() => {
- // ...
- }),
- map(res => ({
- type: 'ACTION_4',
- payload: res
- }))
- );
Redux Observable 使我们可以结合 Redux 和Observable. 在这里, Action 被视作一个流, ofType 相当于
filter(action => action.type === 'SOME_ACTION')
, 从而得到需要监听的Action, 得益于 Redux 的设计, 我们可以通过监听 Action去完成副作用的处理或者监听数据变化. 最后这个流返回一个新的 Action 流, Redux Observable 会把这个新的 Action 流中的 Action dispatch 出去. 由此, 我们在使用 Redux 存储数据的基础上获得了 Rxjs 对异步事件的强大处理能力.
本文首发于(https://tech.youzan.com/reactive2/)[有赞技术博客].
来源: https://juejin.im/post/5b30662e51882574d02fb063