这里有新鲜出炉的 Node.js 主要方法使用说明,程序狗速度看过来!
Node.js 是一个基于 Chrome JavaScript 运行时建立的一个平台, 用来方便地搭建快速的 易于扩展的网络应用 · Node.js 借助事件驱动, 非阻塞 I/O 模型变得轻量和高效, 非常适合 运行在分布式设备 的 数据密集型 的实时应用
本文给大家分享的是使用 Node.js + Redis Sorted Set 实现任务队列的方法示例,非常的实用,有需要的小伙伴可以参考下
需求:功能 A 需要调用第三方 API 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 {data:" 查询结果 ","status":" 正在异步处理中 "} ,这样就需要间隔一段时间后再去调用第三方 API 获取数据。为了用户在使用功能 A 时不会因为第三方 API 正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求。然后定时从任务队列里中取出任务调用第三方 API,若返回状态为" 异步处理中 ",将该任务再次加入任务队列,若返回状态为" 已处理完毕 ",将返回数据入库。
根据以上问题,想到使用 Node.js + Redis sorted set 来实现任务队列。Node.js 实现自身应用 API 用来接受用户请求,合并数据库已存数据与 API 返回的部分数据返回给用户,并将任务加入到任务队列中。利用 Node.js child process 与 cron 定时从任务队列中取出任务执行。
在设计任务队列的过程中需要考虑到的几个问题
针对以上问题的解决方案
示例代码
- // remote_api.js 模拟第三方 API
- 'use strict';
- const app = require('express')();
- app.get('/', (req, res) = >{
- setTimeout(() = >{
- let arr = [200, 300]; // 200 代表成功,300 代表失败需要重新请求
- res.status(200).send({
- 'status': arr[parseInt(Math.random() * 2)]
- });
- },
- 3000);
- });
- app.listen('9001', () = >{
- console.log('API 服务监听端口:9001');
- });
- // producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列
- 'use strict';
- const app = require('express')();
- const redisClient = require('redis').createClient();
- const QUEUE_NAME = 'queue:example';
- function addTaskToQueue(taskName, callback) {
- // 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列
- redisClient.zscore(QUEUE_NAME, taskName, (error, task) = >{
- if (error) {
- console.log(error);
- } else {
- if (task) {
- console.log('任务已存在,不新增相同任务');
- callback(null, task);
- } else {
- redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) = >{
- if (error) {
- callback(error);
- } else {
- callback(null, result);
- }
- });
- }
- }
- });
- }
- app.get('/', (req, res) = >{
- let taskName = req.query['task-name'];
- addTaskToQueue(taskName, (error, result) = >{
- if (error) {
- console.log(error);
- } else {
- res.status(200).send('正在查询中......');
- }
- });
- });
- app.listen(9002, () = >{
- console.log('生产者服务监听端口:9002');
- });
- // consumer.js 定时获取任务并执行
- 'use strict';
- const redisClient = require('redis').createClient();
- const request = require('request');
- const schedule = require('node-schedule');
- const QUEUE_NAME = 'queue:expmple';
- const PARALLEL_TASK_NUMBER = 2; // 并行执行任务数量
- function getTasksFromQueue(callback) {
- // 获取多个任务
- redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) = >{
- if (error) {
- callback(error);
- } else {
- // 将任务分值设置为 0,表示正在处理
- if (tasks.length > 0) {
- let tmp = [];
- tasks.forEach((task) = >{
- tmp.push(0);
- tmp.push(task);
- });
- redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) = >{
- if (error) {
- callback(error);
- } else {
- callback(null, tasks)
- }
- });
- }
- }
- });
- }
- function addFailedTaskToQueue(taskName, callback) {
- redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) = >{
- if (error) {
- callback(error);
- } else {
- callback(null, result);
- }
- });
- }
- function removeSucceedTaskFromQueue(taskName, callback) {
- redisClient.zrem(QUEUE_NAME, taskName, (error, result) = >{
- if (error) {
- callback(error);
- } else {
- callback(null, result);
- }
- })
- }
- function execTask(taskName) {
- return new Promise((resolve, reject) = >{
- let requestOptions = {
- 'url': 'http://127.0.0.1:9001',
- 'method': 'GET',
- 'timeout': 5000
- };
- request(requestOptions, (error, response, body) = >{
- if (error) {
- resolve('failed');
- console.log(error);
- addFailedTaskToQueue(taskName, (error) = >{
- if (error) {
- console.log(error);
- } else {}
- });
- } else {
- try {
- body = typeof body !== 'object' ? JSON.parse(body) : body;
- } catch(error) {
- resolve('failed');
- console.log(error);
- addFailedTaskToQueue(taskName, (error, result) = >{
- if (error) {
- console.log(error);
- } else {}
- });
- return;
- }
- if (body.status !== 200) {
- resolve('failed');
- addFailedTaskToQueue(taskName, (error, result) = >{
- if (error) {
- console.log(error);
- } else {}
- });
- } else {
- resolve('succeed');
- removeSucceedTaskFromQueue(taskName, (error, result) = >{
- if (error) {
- console.log(error);
- } else {}
- });
- }
- }
- });
- });
- }
- // 定时,每隔 5 秒获取新的任务来执行
- let job = schedule.scheduleJob('*/5 * * * * *', () = >{
- console.log('获取新任务');
- getTasksFromQueue((error, tasks) = >{
- if (error) {
- console.log(error);
- } else {
- if (tasks.length > 0) {
- console.log(tasks);
- Promise.all(tasks.map(execTask)).then((results) = >{
- console.log(results);
- }).
- catch((error) = >{
- console.log(error);
- });
- }
- }
- });
- });
来源: http://www.phperz.com/article/17/0430/331769.html