一. 什么是消息队列?
消息 (Message) 是指在应用间传送的数据. 消息可以非常简单, 比如只包含文本字符串, 也可以更复杂, 可能包含嵌入对象.
消息队列 (Message Queue) 是一种应用间的通信方式, 消息发送后可以立即返回, 由消息系统来确保消息的可靠传递. 消息发布者只管把消息发布到 MQ 中而不用管谁来取, 消息使用者只管从 MQ 中取消息而不管是谁发布的. 这样发布者和使用者都不用知道对方的存在.
二. 常用的消息队列有哪些?
RabbitMQ,RocketMQ,ActiveMQ,Kafka,ZeroMQ,MetaMq.
甚至现在部分 NoSQL 也可做消息队列, 如 Redis.
三. 消息队列的使用场景?
异步处理
应用解耦
流量削峰
四. 使用案例
上规模的公司都会有自己的日志分析系统, 日志系统是怎么实现的呢?
图解: 用户在访问应用的时候, 我们要记录下用户的操作记录和系统的异常日志, 常规的做法是将系统产生的日志保存到服务器磁盘, 在服务器中开启定时任务, 定时将磁盘的日志信息传入 mq 中(生产者), 也定时将 mq 中的消息取出并存到相应的数据库, 如 ElasticSearch 或 Hive 中.
五. 如何安装 RabbitMQ?
上面的案例介绍了 MQ 的一个使用场景, 我这里是用 RabbitMQ 举例, 现实项目中可能用到的是 Kafka.
首先安装 brew(mac 为例)
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
安装 RabbitMQ
brew install rabbitmq
运行 RabbitMQ
进入到 /usr/local/Cellar/rabbitmq/3.7.7, 执行
sbin/rabbitmq-server
启动插件
进入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin
./rabbitmq-plugins enable rabbitmq_management
登陆管理界面
打开浏览器输入: http://localhost:15672,RabbitMQ 默认 15672 端口六. Nodejs 操作 RabbitMQ
网上可以找到好几个相应的 Node SDK, 这里推荐 amqplib
1. 生产者
- /**
- * 对 RabbitMQ 的封装
- */
- let amqp = require('amqplib');
- class RabbitMQ {
- constructor() {
- this.hosts = [];
- this.index = 0;
- this.length = this.hosts.length;
- this.open = amqp.connect(this.hosts[this.index]);
- }
- sendQueueMsg(queueName, msg, errCallBack) {
- let self = this;
- self.open
- .then(function (conn) {
- return conn.createChannel();
- })
- .then(function (channel) {
- return channel.assertQueue(queueName).then(function (ok) {
- return channel.sendToQueue(queueName, new Buffer(msg), {
- persistent: true
- });
- })
- .then(function (data) {
- if (data) {
- errCallBack && errCallBack("success");
- channel.close();
- }
- })
- .catch(function () {
- setTimeout(() => {
- if (channel) {
- channel.close();
- }
- }, 500)
- });
- })
- .catch(function () {
- let num = self.index++;
- if (num <= self.length - 1) {
- self.open = amqp.connect(self.hosts[num]);
- } else {
- self.index == 0;
- }
- });
- }
- }
2. 消费者
- /**
- * 对 RabbitMQ 的封装
- */
- let amqp = require('amqplib');
- class RabbitMQ {
- constructor() {
- this.open = amqp.connect(this.hosts[this.index]);
- }
- receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
- let self = this;
- self.open
- .then(function (conn) {
- return conn.createChannel();
- })
- .then(function (channel) {
- return channel.assertQueue(queueName)
- .then(function (ok) {
- return channel.consume(queueName, function (msg) {
- if (msg !== null) {
- let data = msg.content.toString();
- channel.ack(msg);
- receiveCallBack && receiveCallBack(data);
- }
- })
- .finally(function () {
- setTimeout(() => {
- if (channel) {
- channel.close();
- }
- }, 500)
- });
- })
- })
- .catch(function () {
- let num = self.index++;
- if (num <= self.length - 1) {
- self.open = amqp.connect(self.hosts[num]);
- } else {
- self.index = 0;
- self.open = amqp.connect(self.hosts[0]);
- }
- });
- }
3. 通过生产者向 MQ 发送一个消息, 并创建队列
- let mq = new RabbitMQ();
- mq.sendQueueMsg('testQueue', 'my first message', (error) => {
- console.log(error)
- })
执行之后, 我们打开管理平台, 发现 RabbbitMQ 已经接受到了一条消息:
并且 RabbbitMQ 新增了一个队列 testQueue
4. 获取指定队列的消息
- let mq = new RabbitMQ();
- mq.receiveQueueMsg('testQueue',(msg) =>
- {
- console.log(msg)
- })
- // 输出结果: my first message 复制代码
此时打开 RabbitMQ 管理平台, 消息数量已经变为 0
综上: 我们简单讲述了消息队列及 RabbitMQ 相关的一些知识, 以及我们如何通过 nodejs 来生产与消费消息, 上面讲的比较简单, 之后会发表更多文章讲述消息队列集群搭建及容灾的实现.
来源: https://www.cnblogs.com/wukong-holmes/p/9306733.html