在没有深度使用函数回调的经验的时候,去看这些内容还是有一点吃力的。由于Node.js独特的异步特性,才出现了“回调地狱”的问题,这篇文章中,我比较详细的记录了如何解决异步流问题。
文章会很长,而且这篇是对异步流模式的解释。文中会使用一个简单的网络蜘蛛的例子,它的作用是抓取指定URL的网页内容并保存在项目中,在文章的最后,可以找到整篇文章中的源码demo。
本篇不针对初学者,因此会省略掉大部分的基础内容的讲解:
(spider_v1.js)
- const request = require("request");
- const fs = require("fs");
- const mkdirp = require("mkdirp");
- const path = require("path");
- const utilities = require("./utilities");
- function spider(url, callback) {
- const filename = utilities.urlToFilename(url);
- console.log(`filename: ${filename}`);
- fs.exists(filename, exists => {
- if (!exists) {
- console.log(`Downloading ${url}`);
- request(url, (err, response, body) => {
- if (err) {
- callback(err);
- } else {
- mkdirp(path.dirname(filename), err => {
- if (err) {
- callback(err);
- } else {
- fs.writeFile(filename, body, err => {
- if (err) {
- callback(err);
- } else {
- callback(null, filename, true);
- }
- });
- }
- });
- }
- });
- } else {
- callback(null, filename, false);
- }
- });
- }
- spider(process.argv[2], (err, filename, downloaded) => {
- if (err) {
- console.log(err);
- } else if (downloaded) {
- console.log(`Completed the download of ${filename}`);
- } else {
- console.log(`${filename} was already downloaded`);
- }
- });
上边的代码的流程大概是这样的:
这是一个非常简单版本的蜘蛛,他只能抓取一个url的内容,看到上边的回调多么令人头疼。那么我们开始进行优化。
首先,if else 这种方式可以进行优化,这个很简单,不用多说,放一个对比效果:
- /// before
- if (err) {
- callback(err);
- } else {
- callback(null, filename, true);
- }
- /// after
- if (err) {
- return callback(err);
- }
- callback(null, filename, true);
代码这么写,嵌套就会少一层,但经验丰富的程序员会认为,这样写过重强调了error,我们编程的重点应该放在处理正确的数据上,在可读性上也存在这样的要求。
另一个优化是函数拆分,上边代码中的spider函数中,可以把下载文件和保存文件拆分出去。
(spider_v2.js)
- const request = require("request");
- const fs = require("fs");
- const mkdirp = require("mkdirp");
- const path = require("path");
- const utilities = require("./utilities");
- function saveFile(filename, contents, callback) {
- mkdirp(path.dirname(filename), err => {
- if (err) {
- return callback(err);
- }
- fs.writeFile(filename, contents, callback);
- });
- }
- function download(url, filename, callback) {
- console.log(`Downloading ${url}`);
- request(url, (err, response, body) => {
- if (err) {
- return callback(err);
- }
- saveFile(filename, body, err => {
- if (err) {
- return callback(err);
- }
- console.log(`Downloaded and saved: ${url}`);
- callback(null, body);
- });
- })
- }
- function spider(url, callback) {
- const filename = utilities.urlToFilename(url);
- console.log(`filename: ${filename}`);
- fs.exists(filename, exists => {
- if (exists) {
- return callback(null, filename, false);
- }
- download(url, filename, err => {
- if (err) {
- return callback(err);
- }
- callback(null, filename, true);
- })
- });
- }
- spider(process.argv[2], (err, filename, downloaded) => {
- if (err) {
- console.log(err);
- } else if (downloaded) {
- console.log(`Completed the download of ${filename}`);
- } else {
- console.log(`${filename} was already downloaded`);
- }
- });
上边的代码基本上是采用原生优化后的结果,但这个蜘蛛的功能太过简单,我们现在需要抓取某个网页中的所有url,这样才会引申出串行和并行的问题。
(spider_v3.js)
- const request = require("request");
- const fs = require("fs");
- const mkdirp = require("mkdirp");
- const path = require("path");
- const utilities = require("./utilities");
- function saveFile(filename, contents, callback) {
- mkdirp(path.dirname(filename), err = >{
- if (err) {
- return callback(err);
- }
- fs.writeFile(filename, contents, callback);
- });
- }
- function download(url, filename, callback) {
- console.log(`Downloading $ {
- url
- }`);
- request(url, (err, response, body) = >{
- if (err) {
- return callback(err);
- }
- saveFile(filename, body, err = >{
- if (err) {
- return callback(err);
- }
- console.log(`Downloaded and saved: $ {
- url
- }`);
- callback(null, body);
- });
- })
- }
- /// 最大的启发是实现了如何异步循环遍历数组
- function spiderLinks(currentUrl, body, nesting, callback) {
- if (nesting === 0) {
- return process.nextTick(callback);
- }
- const links = utilities.getPageLinks(currentUrl, body);
- function iterate(index) {
- if (index === links.length) {
- return callback();
- }
- spider(links[index], nesting - 1, err = >{
- if (err) {
- return callback(err);
- }
- iterate((index + 1));
- })
- }
- iterate(0);
- }
- function spider(url, nesting, callback) {
- const filename = utilities.urlToFilename(url);
- fs.readFile(filename, "utf8", (err, body) = >{
- if (err) {
- if (err.code !== 'ENOENT') {
- return callback(err);
- }
- return download(url, filename, (err, body) = >{
- if (err) {
- return callback(err);
- }
- spiderLinks(url, body, nesting, callback);
- });
- }
- spiderLinks(url, body, nesting, callback);
- });
- }
- spider(process.argv[2], 2, (err, filename, downloaded) = >{
- if (err) {
- console.log(err);
- } else if (downloaded) {
- console.log(`Completed the download of $ {
- filename
- }`);
- } else {
- console.log(`$ {
- filename
- }
- was already downloaded`);
- }
- });
上边的代码相比之前的代码多了两个核心功能,首先是通过辅助类获取到了某个body中的links:
- const
- links
- =
- utilities
- .
- getPageLinks
- (currentUrl
- ,
- body)
- ;
内部实现就不解释了,另一个核心代码就是:
- /// 最大的启发是实现了如何异步循环遍历数组
- function spiderLinks(currentUrl, body, nesting, callback) {
- if (nesting === 0) {
- return process.nextTick(callback);
- }
- const links = utilities.getPageLinks(currentUrl, body);
- function iterate(index) {
- if (index === links.length) {
- return callback();
- }
- spider(links[index], nesting - 1, err = >{
- if (err) {
- return callback(err);
- }
- iterate((index + 1));
- })
- }
- iterate(0);
- }
可以说上边这一小段代码,就是采用原生实现异步串行的pattern了。除了这些之外,还引入了nesting的概念,通过这是这个属性,可以控制抓取层次。
到这里我们就完整的实现了串行的功能,考虑到性能,我们要开发并行抓取的功能。
(spider_v4.js)
- const request = require("request");
- const fs = require("fs");
- const mkdirp = require("mkdirp");
- const path = require("path");
- const utilities = require("./utilities");
- function saveFile(filename, contents, callback) {
- mkdirp(path.dirname(filename), err = >{
- if (err) {
- return callback(err);
- }
- fs.writeFile(filename, contents, callback);
- });
- }
- function download(url, filename, callback) {
- console.log(`Downloading $ {
- url
- }`);
- request(url, (err, response, body) = >{
- if (err) {
- return callback(err);
- }
- saveFile(filename, body, err = >{
- if (err) {
- return callback(err);
- }
- console.log(`Downloaded and saved: $ {
- url
- }`);
- callback(null, body);
- });
- })
- }
- /// 最大的启发是实现了如何异步循环遍历数组
- function spiderLinks(currentUrl, body, nesting, callback) {
- if (nesting === 0) {
- return process.nextTick(callback);
- }
- const links = utilities.getPageLinks(currentUrl, body);
- if (links.length === 0) {
- return process.nextTick(callback);
- }
- let completed = 0,
- hasErrors = false;
- function done(err) {
- if (err) {
- hasErrors = true;
- return callback(err);
- }
- if (++completed === links.length && !hasErrors) {
- return callback();
- }
- }
- links.forEach(link = >{
- spider(link, nesting - 1, done);
- });
- }
- const spidering = new Map();
- function spider(url, nesting, callback) {
- if (spidering.has(url)) {
- return process.nextTick(callback);
- }
- spidering.set(url, true);
- const filename = utilities.urlToFilename(url);
- /// In this pattern, there will be some issues.
- /// Possible problems to download the same url again and again。
- fs.readFile(filename, "utf8", (err, body) = >{
- if (err) {
- if (err.code !== 'ENOENT') {
- return callback(err);
- }
- return download(url, filename, (err, body) = >{
- if (err) {
- return callback(err);
- }
- spiderLinks(url, body, nesting, callback);
- });
- }
- spiderLinks(url, body, nesting, callback);
- });
- }
- spider(process.argv[2], 2, (err, filename, downloaded) = >{
- if (err) {
- console.log(err);
- } else if (downloaded) {
- console.log(`Completed the download of $ {
- filename
- }`);
- } else {
- console.log(`$ {
- filename
- }
- was already downloaded`);
- }
- });
这段代码同样很简单,也有两个核心内容。一个是如何实现并发:
- /// 最大的启发是实现了如何异步循环遍历数组
- function spiderLinks(currentUrl, body, nesting, callback) {
- if (nesting === 0) {
- return process.nextTick(callback);
- }
- const links = utilities.getPageLinks(currentUrl, body);
- if (links.length === 0) {
- return process.nextTick(callback);
- }
- let completed = 0,
- hasErrors = false;
- function done(err) {
- if (err) {
- hasErrors = true;
- return callback(err);
- }
- if (++completed === links.length && !hasErrors) {
- return callback();
- }
- }
- links.forEach(link = >{
- spider(link, nesting - 1, done);
- });
- }
上边的代码可以说是实现并发的一个pattern。利用循环遍历来实现。另一个核心是,既然是并发的,那么利用
就会存在问题,可能会重复下载同一文件,这里的解决方案是:
- fs.exists
现在我们又有了新的需求,要求限制同时并发的最大数,那么在这里就引进了一个我认为最重要的概念:队列。
(task-Queue.js)
- class TaskQueue {
- constructor(concurrency) {
- this.concurrency = concurrency;
- this.running = 0;
- this.queue = [];
- }
- pushTask(task) {
- this.queue.push(task);
- this.next();
- }
- next() {
- while (this.running < this.concurrency && this.queue.length) {
- const task = this.queue.shift();
- task(() = >{
- this.running--;
- this.next();
- });
- this.running++;
- }
- }
- }
- module.exports = TaskQueue;
上边的代码就是队列的实现代码,核心是
方法,可以看出,当task加入队列中后,会立刻执行,这不是说这个任务一定马上执行,而是指的是next会立刻调用。
- next()
(spider_v5.js)
- const request = require("request");
- const fs = require("fs");
- const mkdirp = require("mkdirp");
- const path = require("path");
- const utilities = require("./utilities");
- const TaskQueue = require("./task-Queue");
- const downloadQueue = new TaskQueue(2);
- function saveFile(filename, contents, callback) {
- mkdirp(path.dirname(filename), err => {
- if (err) {
- return callback(err);
- }
- fs.writeFile(filename, contents, callback);
- });
- }
- function download(url, filename, callback) {
- console.log(`Downloading ${url}`);
- request(url, (err, response, body) => {
- if (err) {
- return callback(err);
- }
- saveFile(filename, body, err => {
- if (err) {
- return callback(err);
- }
- console.log(`Downloaded and saved: ${url}`);
- callback(null, body);
- });
- })
- }
- /// 最大的启发是实现了如何异步循环遍历数组
- function spiderLinks(currentUrl, body, nesting, callback) {
- if (nesting === 0) {
- return process.nextTick(callback);
- }
- const links = utilities.getPageLinks(currentUrl, body);
- if (links.length === 0) {
- return process.nextTick(callback);
- }
- let completed = 0, hasErrors = false;
- links.forEach(link => {
- /// 给队列出传递一个任务,这个任务首先是一个函数,其次该函数接受一个参数
- /// 当调用任务时,触发该函数,然后给函数传递一个参数,告诉该函数在任务结束时干什么
- downloadQueue.pushTask(done => {
- spider(link, nesting - 1, err => {
- /// 这里表示,只要发生错误,队列就会退出
- if (err) {
- hasErrors = true;
- return callback(err);
- }
- if (++completed === links.length && !hasErrors) {
- callback();
- }
- done();
- });
- });
- });
- }
- const spidering = new Map();
- function spider(url, nesting, callback) {
- if (spidering.has(url)) {
- return process.nextTick(callback);
- }
- spidering.set(url, true);
- const filename = utilities.urlToFilename(url);
- /// In this pattern, there will be some issues.
- /// Possible problems to download the same url again and again。
- fs.readFile(filename, "utf8", (err, body) => {
- if (err) {
- if (err.code !== 'ENOENT') {
- return callback(err);
- }
- return download(url, filename, (err, body) => {
- if (err) {
- return callback(err);
- }
- spiderLinks(url, body, nesting, callback);
- });
- }
- spiderLinks(url, body, nesting, callback);
- });
- }
- spider(process.argv[2], 2, (err, filename, downloaded) => {
- if (err) {
- console.log(`error: ${err}`);
- } else if (downloaded) {
- console.log(`Completed the download of ${filename}`);
- } else {
- console.log(`${filename} was already downloaded`);
- }
- });
因此,为了限制并发的个数,只需在
方法中,把task遍历放入队列就可以了。这相对来说很简单。
- spiderLinks
到这里为止,我们使用原生JavaScript实现了一个有相对完整功能的网络蜘蛛,既能串行,也能并发,还可以控制并发个数。
把不同的功能放到不同的函数中,会给我们带来巨大的好处,async库十分流行,它的性能也不错,它内部基于callback。
(spider_v6.js)
- const request = require("request");
- const fs = require("fs");
- const mkdirp = require("mkdirp");
- const path = require("path");
- const utilities = require("./utilities");
- const series = require("async/series");
- const eachSeries = require("async/eachSeries");
- function download(url, filename, callback) {
- console.log(`Downloading $ {
- url
- }`);
- let body;
- series([callback = >{
- request(url, (err, response, resBody) = >{
- if (err) {
- return callback(err);
- }
- body = resBody;
- callback();
- });
- },
- mkdirp.bind(null, path.dirname(filename)), callback = >{
- fs.writeFile(filename, body, callback);
- }], err = >{
- if (err) {
- return callback(err);
- }
- console.log(`Downloaded and saved: $ {
- url
- }`);
- callback(null, body);
- });
- }
- /// 最大的启发是实现了如何异步循环遍历数组
- function spiderLinks(currentUrl, body, nesting, callback) {
- if (nesting === 0) {
- return process.nextTick(callback);
- }
- const links = utilities.getPageLinks(currentUrl, body);
- if (links.length === 0) {
- return process.nextTick(callback);
- }
- eachSeries(links, (link, cb) = >{
- "use strict";
- spider(link, nesting - 1, cb);
- },
- callback);
- }
- const spidering = new Map();
- function spider(url, nesting, callback) {
- if (spidering.has(url)) {
- return process.nextTick(callback);
- }
- spidering.set(url, true);
- const filename = utilities.urlToFilename(url);
- fs.readFile(filename, "utf8", (err, body) = >{
- if (err) {
- if (err.code !== 'ENOENT') {
- return callback(err);
- }
- return download(url, filename, (err, body) = >{
- if (err) {
- return callback(err);
- }
- spiderLinks(url, body, nesting, callback);
- });
- }
- spiderLinks(url, body, nesting, callback);
- });
- }
- spider(process.argv[2], 1, (err, filename, downloaded) = >{
- if (err) {
- console.log(err);
- } else if (downloaded) {
- console.log(`Completed the download of $ {
- filename
- }`);
- } else {
- console.log(`$ {
- filename
- }
- was already downloaded`);
- }
- });
在上边的代码中,我们只使用了async的三个功能:
- const series = require("async/series"); // 串行
- const eachSeries = require("async/eachSeries"); // 并行
- const queue = require("async/queue"); // 队列
由于比较简单,就不做解释了。async中的队列的代码在(spider_v7.js)中,和上边我们自定义的队列很相似,也不做更多解释了。
Promise是一个协议,有很多库实现了这个协议,我们用的是ES6的实现。简单来说promise就是一个约定,如果完成了,就调用它的resolve方法,失败了就调用它的reject方法。它内有实现了then方法,then返回promise本身,这样就形成了调用链。
其实Promise的内容有很多,在实际应用中是如何把普通的函数promise化。这方面的内容在这里也不讲了,我自己也不够格
(spider_v8.js)
- const utilities = require("./utilities");
- const request = utilities.promisify(require("request"));
- const fs = require("fs");
- const readFile = utilities.promisify(fs.readFile);
- const writeFile = utilities.promisify(fs.writeFile);
- const mkdirp = utilities.promisify(require("mkdirp"));
- const path = require("path");
- function saveFile(filename, contents, callback) {
- mkdirp(path.dirname(filename), err = >{
- if (err) {
- return callback(err);
- }
- fs.writeFile(filename, contents, callback);
- });
- }
- function download(url, filename) {
- console.log(`Downloading $ {
- url
- }`);
- let body;
- return request(url).then(response = >{
- "use strict";
- body = response.body;
- return mkdirp(path.dirname(filename));
- }).then(() = >writeFile(filename, body)).then(() = >{
- "use strict";
- console.log(`Downloaded adn saved: $ {
- url
- }`);
- return body;
- });
- }
- /// promise编程的本质就是为了解决在函数中设置回调函数的问题
- /// 通过中间层promise来实现异步函数同步化
- function spiderLinks(currentUrl, body, nesting) {
- let promise = Promise.resolve();
- if (nesting === 0) {
- return promise;
- }
- const links = utilities.getPageLinks(currentUrl, body);
- links.forEach(link = >{
- "use strict";
- promise = promise.then(() = >spider(link, nesting - 1));
- });
- return promise;
- }
- function spider(url, nesting) {
- const filename = utilities.urlToFilename(url);
- return readFile(filename, "utf8").then(body = >spiderLinks(url, body, nesting), err = >{
- "use strict";
- if (err.code !== 'ENOENT') {
- /// 抛出错误,这个方便与在整个异步链的最后通过呢catch来捕获这个链中的错误
- throw err;
- }
- return download(url, filename).then(body = >spiderLinks(url, body, nesting));
- });
- }
- spider(process.argv[2], 1).then(() = >{
- "use strict";
- console.log('Download complete');
- }).
- catch(err = >{
- "use strict";
- console.log(err);
- });
可以看到上边的代码中的函数都是没有callback的,只需要在最后catch就可以了。
在设计api的时候,应该支持两种方式,及支持callback,又支持promise
- function asyncDivision(dividend, divisor, cb) {
- return new Promise((resolve, reject) = >{
- "use strict";
- process.nextTick(() = >{
- const result = dividend / divisor;
- if (isNaN(result) || !Number.isFinite(result)) {
- const error = new Error("Invalid operands");
- if (cb) {
- cb(error);
- }
- return reject(error);
- }
- if (cb) {
- cb(null, result);
- }
- resolve(result);
- });
- });
- }
- asyncDivision(10, 2, (err, result) = >{
- "use strict";
- if (err) {
- return console.log(err);
- }
- console.log(result);
- });
- asyncDivision(22, 11).then((result) = >console.log(result)).
- catch((err) = >console.log(err));
Generator很有意思,他可以让暂停函数和恢复函数,利用thunkify和co这两个库,我们下边的代码实现起来非常酷。
(spider_v9.js)
- const thunkify = require("thunkify");
- const co = require("co");
- const path = require("path");
- const utilities = require("./utilities");
- const request = thunkify(require("request"));
- const fs = require("fs");
- const mkdirp = thunkify(require("mkdirp"));
- const readFile = thunkify(fs.readFile);
- const writeFile = thunkify(fs.writeFile);
- const nextTick = thunkify(process.nextTick);
- function * download(url, filename) {
- console.log(`Downloading $ {
- url
- }`);
- const response = yield request(url);
- console.log(response);
- const body = response[1];
- yield mkdirp(path.dirname(filename));
- yield writeFile(filename, body);
- console.log(`Downloaded and saved $ {
- url
- }`);
- return body;
- }
- function * spider(url, nesting) {
- const filename = utilities.urlToFilename(url);
- let body;
- try {
- body = yield readFile(filename, "utf8");
- } catch(err) {
- if (err.code !== 'ENOENT') {
- throw err;
- }
- body = yield download(url, filename);
- }
- yield spiderLinks(url, body, nesting);
- }
- function * spiderLinks(currentUrl, body, nesting) {
- if (nesting === 0) {
- return nextTick();
- }
- const links = utilities.getPageLinks(currentUrl, body);
- for (let i = 0; i < links.length; i++) {
- yield spider(links[i], nesting - 1);
- }
- }
- /// 通过co就自动处理了回调函数,直接返回了回调函数中的参数,把这些参数放到一个数组中,但是去掉了err信息
- co(function * () {
- try {
- yield spider(process.argv[2], 1);
- console.log('Download complete');
- } catch(err) {
- console.log(err);
- }
- });
我并没有写promise和generator并发的代码。以上这些内容来自于这本书nodejs-design-patternshttps://github.com/agelessman/MyBooks。
demo下载
来源: http://www.cnblogs.com/machao/p/7724321.html