一 concat,merge,zip,combineLatest 等合并类操作符
以上操作符在版本 6 中已经只存在静态方法, 不能在 pipe 中使用.
import {concat,merge,zip,combineLatest}
1.concat (obs1,obs2,obs3) 首尾相连
依次将多个 observable 首尾合并, 必须在第一个 obs1 的数据全部完成, 才能进行第二个 obs2, 如果第一个为 interval(1000), 那么 obs2 和 obs3 也就永远没有机会得到输出.
- concat(of(1,2,3),interval).subscribe(console.log);
- // 1 2 3 0 1 2 3 ...
2.merge 先到先得快速通过
merge 会第时间订阅所有的上游 Observable, 然后对上游的数据采取 "先到先得" 的策略, 任何个 Observable 只要有数据推下来, 就刻转给下游 Observable 对象.
- merge(interval(1000),of(1,2,3)).subscribe(console.log);
- merge(of(1,2,3),interval(1000)).subscribe(console.log);
- // 两种情况的输出结果一样, 都是先一次性输出 1 2 3 再间隔一秒依次输出 0 1 2 ...
- const source1$ = Observable.timer(0, 1000).map(x => x+'A');
- const source2$ = Observable.timer(500, 1000).map(x => x+'B');
- merge(source1$, source2$).subscribe(
- console.log,
- null,
- () => console.log('complete')
- );
- //0A
- //0B
- //1A
- //1B
- //2A
- //2B
merge 的应用场景: 我们知道 fromEvent 可以从页中获取事件, 只可惜, fromEvent 次只能从个 DOM 元素获取种类型的事件. 如, 我们关某个元素的 click 事件, 同时也关这个元素上的 touchend 事件, 因为在移动设备上 touchend 事件出现得 click 更早, 这两个事件的处理是模样的, 但是 fromEvent 不能同时获得两个事件的数据流, 这时候就要借助 merge 的量了, 代码如下:
- const click$ = Rx.Observable.fromEvent(element, 'click');
- const touchend$ = Rx.Observable.fromEvent(element, 'touchend');
- merge(click$, touchend$).subscribe(eventHandler)
3.zip : 拉链式组合
一对一的合并
zip 会把上游的数据转化为数组形式, 每个上游 Observable 贡献的数据会在对应数组中占席之地.
默认的输出格式为数组格式, 可通过第二个参数进行参数格式组装
简而言之: 不管是同步产生数据还是异步产生的数据, 都会每次依次从需要合并的 observable 中取一个数据合并成一个数组输出, 当某一个 observer 不再吐出数据了, 则终止合并, 执行 complete 函数
- const source1$ = Observable.of(1, 2, 3);
- const source2$ = Observable.of('a', 'b', 'c');
- zip(source1$, source2$).subscribe(
- console.log,
- null,
- () => console.log('complete')
- );
- //[ 1, 'a' ]
- //[ 2, 'b' ]
- //[ 3, 'c' ]
- //complete
4.combineLatest: 合并最后一个数据
输出的数组中元素个数与合并的 observable 的个数相等.
在合并的 observable 中, 只有最后一个元素为下游, 前面的参数如果有同步的数据, 同步数据中只有最后一个数据能进入数据流
默认的输出格式为数组格式, 可通过第二个参数进行参数格式组装
第一次执行, 当上游产生了数据, 下游还没来得及产生数据时, 就会等待. 第二轮时候, 不管是上游或者下游产生一个数据, 都会执行输出, 还没来得及产生数据的 observable 就输出原来产生的数据, 如下弹珠图
- const source1$ = Observable.timer(500, 1000);
- const source2$ = Observable.timer(1000, 1000);
- combineLatest(source1$,source2$).subscribe(
- console.log,
- null,
- () => console.log('complete')
- );
- //[ 0, 0 ]
- //[ 1, 0 ]
- //[ 1, 1 ]
- //[ 2, 1 ]
- //[ 2, 2 ]
- //[ 3, 2 ]
5.withLatestFrom
withLatestFrom 为管道中使用的方法. 默认的输出格式为数组格式, 可通过第二个参数进行参数格式组装
withLatestFrom 只有实例操作符的形式, 且所有输 Observable 的地位并不相同, 调 withLatestFrom 的那个 Observable 对象起到主导数据产节奏的作, 作为参数的 Observable 对象只能贡献数据, 不能控制产数据的时机.
- const source1$ = Observable.timer(0, 2000).map(x => 100 * x);
- const source2$ = Observable.timer(500, 1000);
- source1$.pipe(
- withLatestFrom(source2$, (a,b)=> a+b);
- ).subscribe(
- console.log,
- null,
- () => console.log('complete')
- );
source1$ 产第个数据 0 时, withLatestFrom 的另个输 Observable 对象 source2$ 还没有产数据, 所以这个 0 也被忽略了.
解决 glitch
例 1:
- const original$ = Observable.timer(0, 1000);
- const source1$ = original$.map(x => x+'a');
- const source2$ = original$.map(x => x+'b');
- const result$ = source1$.pipe(withLatestFrom(source2$);)
- result$.subscribe(
- console.log,
- null,
- () => console.log('complete')
- );
例 2:
- const event$ = Rx.Observable.fromEvent(document.body, 'click');
- const x$ = event$.map(e => e.x);
- const y$ = event$.map(e => e.y);
- const result$ = x$.pipe(combineLatest(y$, (x, y) => `x: ${x}, y: ${y}`)).subscribe(
- (location) => {
- console.log('#render', location);
- document.querySelector('#text').innerText = location;
- }
- );
race : 胜者通吃
race 就是 "竞争", 多个 Observable 对象在起, 看谁最先产数据, 不过这种竞争是分残酷的, 胜者通吃, 败者则失去所有机会.
简而言之, 通过 race 合并多个 observable 时, 最先吐出数据那个 observable 会成为数据源, 其它的 observable 会被淘汰.
startWith
startWith 只有实例操作符的形式, 其功能是让个 Observable 对象在被订阅的时候, 总是先吐出指定的若个数据. 下是使 startWith 的例代码
- of(0,1,2).pipe(startWith('a','b')).subscribe(console.log);
- // 先依次吐出 a b 0 1 2
- forkJoin
forkJoin 可以接受多个 Observable 对象作为参数, forkJoin 产的 Observable 对象也很有特点, 它只会产个数据, 因为它会等待所有参数 Observable 对象的最后个数据, 也就是说, 只有当所有 Observable 对象都完结, 确定不会有新的数据产的时候, forkJoin 就会把所有输 Observable 对象产的最后个数据合并成给下游唯的数据.
forkJoin 就是 RxJS 界的 Promise.all,Promise.all 等待所有输的 Promise 对象成功之后把结果合并, forkJoin 等待所有输的 Observable 对象完结之后把最后个数据合并.
返回数组形式, 数组中元素个数为合并的 observable 的个数
- JS forkJoin(interval(1000).pipe(take(3)),of(1,2,3),timer(2000,1000).pipe(take(3))).subscribe(console.log); // [2,3,2]
- JS
高阶 Observable
简言之: 阶函数就是产函数的函数; 类似, 所谓阶 Observable, 指的是产的数据依然是 Observable 的 Observable
1.concatAll
concatAll 只有个上游 Observable 对象, 这个 Observable 对象预期是个阶 Observable 对象, concatAll 会对其中的内部 Observable 对象做 concat 的操作.
- interval(1000).pipe(
- take(2),
- map(x=>interval(1500).pipe(take(2),map(x=> `${x}:x,y:${y}`))),
- concatAll()
- ).subscribe(console.log);
- // 0:a,b:0
- // 0:a,b:1
- // 1:a,b:0
- // 1:a,b:1
concat 实际运用
- fromEvent(document.body,'mousedown').pipe(
- map(
- e=>fromEvent(document.body,'mousemove').pipe(map(e=>{return {x:e.clientX,y:e.clientY}}), takeUntil(fromEvent(document.body,'mouseup')))
- ),
- concatAll()
- ).subscribe(console.log);
- mergeAll
mergeAll 就是处理阶 Observable 的 merge, 只是所有的输 Observable 来于上游产的内部 Observable 对象.
- interval(1000).pipe(
- take(2),
- map(x => Observable.interval(1500).map(y => x+':'+y).take(2)),
- mergeAll()
- )
mergeAll 只要
发现上游产个内部 Observable 就会刻订阅, 并从中抽取收据, 所以在上图中, 第个内部 Observable 产的数据 1:0 会出现在第个内部 Observable 产的数据 0:1 之前.
- zipAll
- interval(1000).pipe(
- take(2),
- map(x => Observable.interval(1500).map(y => x+':'+y).take(2)),
- zipAll()
- )
- //[ '0:0', '1:0' ]
- //[ '0:1', '1:1' ]
- //complete
- combeneAll
combineAll 就是处理阶 Observable 的 combineLatest, 可能是因为 combine-LatestAll 太长了, 所以 RxJS 选择了 combineAll 这个名字.
- interval(1000).pipe(
- take(2),
- map(x => Observable.interval(1500).map(y => x+':'+y).take(2)),
- combeneAll()
- )
- //[ '0:0', '1:0' ]
- //[ '0:1', '1:0' ]
- //[ '0:1', '1:1' ]
- //complete
- switch
switch 的含义就是 "切换", 总是切换到最新的内部 Observable 对象获取数据. 每当 switch 的上游阶 Observable 产个内部 Observable 对象, switch 都会刻订阅最新的内部 Observable 对象上, 如果已经订阅了之前的内部 Observable 对象, 就会退订那个过时的内部 Observable 对象, 这个 "上新的, 舍弃旧的" 动作, 就是切换.
应用场景: 也就是外层的数据产生快于内层的数据产生的速度造成数据积压, 需求又能够舍弃原来的旧的外层的数据不让其旧的外层数据再传递到内层产生数据了.
简而言之, 当外层新产生数据时, 无论内部数据产生情况如何都作废, 重新计算数据流
- interval(1000).pipe(
- take(3),
- map(x => Observable.interval(1500).map(y => x+':'+y).take(2)),
- switch()
- )
- //1:0
- //1:1
- //complete
第个 Observable 对象有机会产数据 0:0, 但是在第个数据 0:1 产之前, 第个内部 Observable 对象产, 这时发切换, 第个内部 Observable 就退场了. 同样, 第个内部 Observable 只有机会产个数据 1:0, 然后第三个内部 Observable 对象产, 之后没有新的内部 Observable 对象产, 所以第三个 Observable 对象的两个数据 2:0 和 2:1 都进了下游.
exhaust
在耗尽当前内部 Observable 的数据之前不会切换到下个内部 Observable 对象
同样是连接阶 Observable 产的内部 Observable 对象, 但是 exhaust 的策略和 switch 相反, 当内部 Observable 对象在时间上发重叠时, 情景就是前个内部 Observable 还没有完结, 新的 Observable 又已经产, 到底应该选择哪个作为数据源? switch 选择新产的内部 Observable 对象, exhaust 则选择前个内部 Observable 对象.
- interval(1000).pipe(
- take(3),
- map(x => Observable.interval(700).map(y => x+':'+y).take(2)),
- exhaust()
- )
posted on 2019-05-14 16:59 长安城下翩翩少年 阅读 (...) 评论 (...) 编辑 收藏
来源: https://www.cnblogs.com/honkerzh/p/10863190.html