今天, 公司内大神问我 Promise 有没有类似 Promise.all 多个并发执行的并且保证数据的顺序, 但是又可以完成一个异步操作就执行异步的方法, 比如:
这里有 5 个 http 分别耗时
[1000, 2000, 3000, 5000, 7000] ms
, 假如我使用 Promise.all 从完成请求到开始执行回调一共需要时间是 7000ms
- const arr = [1000, 200, 500, 700, 600];
- const mockHttp = v => new Promise((resolve, reject) => {
- setTimeout(() => resolve(v), v);
- });
- const tasks = [];
- arr.forEach(v => tasks.push(mockHttp(v)));
- const cb = v => console.log(v);
- (async () => {
- console.time();
- const result = await Promise.all(tasks);
- console.timeEnd();
- // Get data, then do callback
- result.forEach(v => cb(v));
- })()
- // Result
- // default: 1000.254150390625ms
- // 1000
- // 200
- // 500
- // 700
- // 600
这样虽然能保证拿到的结果是按照顺序来的, 但是, 在执行回调的时候却是在等全部异步结束后, 我期望的是这样的
- // 1000
- // 200
- // 500
- // 700
- // 600
- // default: 1000.254150390625ms
保证异步结果是按顺序返回的
并发
这个我自己感觉是 Promise.all 与 Promise.race 的结合, 无奈没有发现 Promise 规范中没有这种接口, 于是就尝试自己实现.
当时正好在看 Rxjs , 所以感觉这个刚好就很适合这种场景, 于是就开始写
首先, 需要同样需要 mock 异步的过程, 这里直接使用上面的 setTimeout 来 mock 异步的过程.
- const mockHttp = v => new Promise((resolve, reject) => {
- setTimeout(() => resolve(v), v);
- })
然后, 我们开始写异步并发的代码, 这里使用 Rxjs 的观察者模式
- const tasks = [3500, 300, 5000, 3000, 700, 2000, 5];
- const a = Rx.Observable.create((obs) => {
- tasks.forEach((v, idx) => {
- // Do something async but return value must include { value, index }
- mockHttp(v).then(data => obs.next({data, idx}))
- });
- });
为了能拿到异步结果并且执行, 需要添加观察者.
- // 假如直接订阅就没有意义了, 不能让异步的结果按照顺序返回
- function scheduler (input) {
- const queen = [];
- let idxObj = {};
- return Rx.Observable.create((obs) => {
- input.subscribe((data) => {
- if (data.idx === 0 || data.idx === idxObj.idx) {
- obs.next(data);
- idxObj.idx = data.idx + 1;
- // For wait queen value
- it(idxObj, queen, obs)
- } else {
- queen.push(data);
- }
- })
- })
- }
- function it (idxObj, data, obs) {
- const newArr = Array.from(data).sort((a, b) => a.idx - b.idx)
- if (newArr.length !== 0 && newArr[0].idx === idxObj.idx) {
- obs.next(newArr[0])
- data.splice(data.findIndex(v => v.idx === newArr[0].idx), 1);
- idxObj.idx = newArr[0].idx + 1;
- it(idxObj, data, obs);
- }
- }
这段代码是整个功能的核心, 主要作用就是增加一个队列用于缓存排在后面但异步所需时间较短的返回值, 然后在每一次返回值时去遍历这个队列将顺序的值给返回出来. 因为 Rxjs 的观察者订阅模式, 可以较为简单的实现这个功能, 也可以使用 nodejs 的 event 库来达到同样的目的.
最后, 加上订阅函数, 也就是 handle , 就能达到预期的效果了.
- const handle = v => console.log(v);
- scheduler(a).subscribe(v => {
- console.log(v);
- handle(v.data);
- })
这里为了能看出执行顺序, 所以把顺序也给附加在返回值中.
最后附上全部代码链接 JS Fiddle https://jsfiddle.net/Lrw1f5jk/2/
假如需要更直观的看到执行顺序使用下面代码在 Rxjs 页面执行.
- console.time();
- const tasks = [3500, 300, 5000, 3000, 700, 2000, 5];
- const mockHttp = v => new Promise((resolve, reject) => {
- setTimeout(() => resolve(v), v);
- })
- const a = Rx.Observable.create((obs) => {
- tasks.forEach((v, idx) => {
- // Do something async but return value must include { value, index }
- mockHttp(v).then(data => obs.next({data, idx}))
- });
- });
- function scheduler (input) {
- const queen = [];
- let idxObj = {};
- return Rx.Observable.create((obs) => {
- input.subscribe((data) => {
- if (data.idx === 0 || data.idx === idxObj.idx) {
- obs.next(data);
- idxObj.idx = data.idx + 1;
- // For wait queen value
- it(idxObj, queen, obs)
- } else {
- queen.push(data);
- }
- })
- })
- }
- function it (idxObj, data, obs) {
- const newArr = Array.from(data).sort((a, b) => a.idx - b.idx)
- if (newArr.length !== 0 && newArr[0].idx === idxObj.idx) {
- obs.next(newArr[0])
- data.splice(data.findIndex(v => v.idx === newArr[0].idx), 1);
- idxObj.idx = newArr[0].idx + 1;
- it(idxObj, data, obs);
- }
- }
- const handle = v => { console.timeEnd(); console.log(v); console.time() }
- scheduler(a).subscribe(handle)
总结: 写了这么多, 就为了两个目标, 一个是并发异步, 另一个是只要按照顺序执行 callback 并且不是等全部并发都完成. 如有疏漏, 请指正, 谢谢!
来源: https://juejin.im/entry/5af5c3ddf265da0b7c0756f8