一, 消息队列概述
消息 (Message) 是指在应用间传送的数据. 消息可以非常简单, 比如只包含文本字符串, 也可以更复杂, 可能包含嵌入对象.
消息队列 (Message Queue) 是一种应用间的通信方式, 消息发送后可以立即返回, 由消息系统来确保消息的可靠传递. 消息发布者只管把消息发布到 MQ 中而不用管谁来取, 消息使用者只管从 MQ 中取消息而不管是谁发布的. 这样发布者和使用者都不用知道对方的存在.
消息队列中间件是分布式系统中重要的组件, 主要解决异步消息, 流量削峰, 应用耦合等问题. 实现高性能, 高可用, 可伸缩和最终一致性架构. 是大型分布式系统不可缺少的中间件. 目前使用较多的消息队列产品有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 等.
生活中的例子
老式餐厅点餐后需呆在座位上等餐, 中途不能离开去干别的事, 如果离开去干别的事, 餐好了, 点餐人却不知道.
新式餐厅点餐后, 餐厅会提供一个 "电子盘" 给顾客, 顾客可以不用在店里等餐, 可以去附近逛逛, 买买东西, 等餐好了, 手上的 "电子盘" 就会响, 通知顾客可以回去就餐了.
对比以上两种形式, 第二种情形就像消息队列一样, 点完餐以后就可以去处理别的事情, 不用一直在餐厅等着.
二, 消息队列的作用
上面说了消息队列主要解决了异步处理, 流量削峰, 应用耦合等三个方面的问题.
异步处理
场景说明: 用户注册后, 系统要发送注册邮件和注册短信. 传统的方式有两种, 串行模式和并行方式 .
串行模式: 将注册信息存入数据库成功后, 发送注册短信和注册邮件, 以上三个步骤都完成后, 将成功的信息返回给客户端.
并行模式: 将注册信息存入数据库成功后, 同时发送注册短信和注册邮件, 以上三个任务都完成后返回给客户端, 与串行模式的差别是并行模式可以提高处理的时间.
流量削峰
应用耦合
三, Active MQ
下载
安装
直接解压, 然后移动到指定目录即可.
- >tar zxvf apache-activemq-5.15.10-bin.tar.gz
- >mv ./apache-activemq-5.15.10 /usr/local
启动
- >/usr/local/activemq-5.15.10/bin/activemq start
- # 检查启动状态
- [[email protected] bin]# jps
- 3168 Jps
- 2268 activemq.jar
- # activemq 启动的默认端口号 61616
- [[email protected] bin]# lsof -i:61616
- COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
- java 2268 root 132u IPv6 15719 0t0 TCP *:61616 (LISTEN)
其他基本命令
- > activemq restart # 重启
- > activemq stop # 关闭
- > activemq start> /activemq_home/logs/activemq.log # 落地相关信息, 打印日志
指定配置文件的启动
./bin/activemq start xbean:/usr/local/activemq-5.15.10/conf/activemq.xml
后台图形化界面支持
http://127.0.0.1:8161/admin
默认用户名 / 密码, admin/admin
图形化页面相关信息说明
Number Of Pending Messages
等待消费的消息
未出队列的数量
Number Of Consumers
消费者数量
Messages Enqueued
进队消息数, 进入队列的总数包括出队的消息数
Messages Dequeued
出队消息数, 即消费者消费后的消息
四, Java 操作 ActiveMQ
依赖 jar 包
- dependencies {
- compile('org.apache.activemq:activemq-all:5.15.9')
- compile('org.apache.activemq:activemq-pool:5.15.9')
- }
第一种模式: Queue
生产流程
创建连接工厂对象
从工厂中建立一个连接并开启(Connection)
从连接中建立一个会话(Session)
基于会话建立目的地(Queue)
基于会话创建生产者(Producer)
在会话的基础上创建一条消息(Message)
生产者将消息发出
资源关闭
- public class Producer {
- // activemq 服务的地址, 默认通信端口为 61616
- private static final String URL = "tcp://192.168.182.128:61616";
- // 定义队列的名称
- private static final String QUEUE_NAME = "test-Queue";
- public static void main(String[] args) {
- MessageProducer producer = null;
- Session session = null;
- Connection connection = null;
- try {
- // 创建连接工厂对象
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
- // 从工厂中建立一个连接并开启(Connection)
- connection = connectionFactory.createConnection();
- connection.start();
- // 从连接中建立一个会话(Session)
- session = connection.createSession(false, 1);
- // 基于会话建立队列(Queue)
- Queue queue = session.createQueue(QUEUE_NAME);
- // 基于会话创建生产者(Producer)
- producer = session.createProducer(queue);
- for (int i = 0; i <10; i++) {
- // 在会话的基础上创建一条消息(Message)
- TextMessage textMessage = session.createTextMessage("test-mq:" + i);
- // 生产者将消息发出
- producer.send(textMessage);
- }
- } catch (Exception ex) {
- throw new IllegalStateException(ex);
- // 资源关闭
- } finally {
- try {
- if (null != producer) {
- producer.close();
- }
- } catch (JMSException e) {
- e.printStackTrace();
- }
- try {
- if (null != session) {
- session.close();
- }
- } catch (JMSException e) {
- e.printStackTrace();
- }
- try {
- if (null != connection) {
- connection.close();
- }
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
- }
- Producer
执行以上代码后, 我们可以在管理页面上看到如下情况:
消费流程
创建连接工厂对象
从工厂中建立一个连接并开启(Connection)
从连接中建立一个会话(Session)
基于会话建立目的地(Queue)
基于会话创建消费者(Consumer)
消费者接收消息
资源关闭
- public class Consumer {
- // activemq 服务地址, 默认通信端口为 61616
- private static final String URL = "tcp://192.168.182.128:61616";
- // 定义队列的名称
- private static final String QUEUE_NAME = "test-Queue";
- public static void main(String[] args) {
- MessageConsumer consumer = null;
- Session session = null;
- Connection connection = null;
- try {
- // 创建连接工厂对象
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
- // 从工厂中建立一个连接并开启(Connection)
- connection = connectionFactory.createConnection();
- connection.start();
- // 从连接中建立一个会话(Session)
- session = connection.createSession(false, 1);
- // 基于会话建立队列(Queue)
- Queue queue = session.createQueue(QUEUE_NAME);
- // 基于会话创建消费者(Consumer)
- consumer = session.createConsumer(queue);
- // 接收消息的第一种方式, 阻塞式接收
- // Message message = consumer.receive();
- // System.out.println("consumer recive message =" + message);
- // 接收消息的第二种方式, 使用监听器
- consumer.setMessageListener(msg -> {
- TextMessage textMessage = (TextMessage) msg;
- try {
- System.out.println("textMessage =" + textMessage.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- });
- } catch (Exception ex) {
- throw new IllegalStateException(ex);
- }
- }
- }
- Consumer
执行以上代码后, 我们可以在管理页面上看到如下情况:
我们这次先运行两个 Consumer, 由于 Consumer 种没有关闭资源, 所以会一直保持和 ActiveMQ 的连接.
然后再运行 Producer, 我们来看看现象:
控制台打印的信息中, Consumer1 消费的信息都是偶数的, Consumer2 消费的信息都是奇数的, 一条消息只能被一个 Consumer 消费.
第二种模式: Topic
生产流程
创建连接工厂对象
从工厂中建立一个连接并开启(Connection)
从连接中建立一个会话(Session)
基于会话建立目的地(Topic)
基于会话创建生产者(Producer)
在会话的基础上创建一条消息(Message)
生产者将消息发出
资源关闭
- public class Producer {
- // activemq 服务地址, 默认通信端口为 61616
- private static final String URL = "tcp://192.168.182.128:61616";
- // 定义队列的名称
- private static final String TOPIC_NAME = "test-Topic";
- public static void main(String[] args) {
- MessageProducer producer = null;
- Session session = null;
- Connection connection = null;
- try {
- // 创建连接工厂对象
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
- // 从工厂中建立一个连接并开启(Connection)
- connection = connectionFactory.createConnection();
- connection.start();
- // 从连接中建立一个会话(Session)
- session = connection.createSession(false, 1);
- // 基于会话建立目的地(Topic)
- Topic topic = session.createTopic(TOPIC_NAME);
- // 基于会话创建生产者(Producer)
- producer = session.createProducer(topic);
- for (int i = 0; i <10; i++) {
- // 在会话的基础上创建一条消息(Message)
- TextMessage textMessage = session.createTextMessage("test-topic:" + i);
- // 生产者将消息发出
- producer.send(textMessage);
- }
- } catch (Exception ex) {
- throw new IllegalStateException(ex);
- // 资源关闭
- } finally {
- try {
- if (null != producer) {
- producer.close();
- }
- } catch (JMSException e) {
- e.printStackTrace();
- }
- try {
- if (null != session) {
- session.close();
- }
- } catch (JMSException e) {
- e.printStackTrace();
- }
- try {
- if (null != connection) {
- connection.close();
- }
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
- }
- Producer
消费流程
创建连接工厂对象
从工厂中建立一个连接并开启(Connection)
从连接中建立一个会话(Session)
基于会话建立目的地(Topic)
基于会话创建消费者(Consumer)
消费者接收消息
资源关闭
- public class Consumer1 {
- // activemq 服务的地址, 默认通信端口为 61616
- private static final String URL = "tcp://192.168.182.128:61616";
- // 定义队列的名称
- private static final String TOPIC_NAME = "test-Topic";
- public static void main(String[] args) {
- MessageConsumer consumer = null;
- Session session = null;
- Connection connection = null;
- try {
- // 创建连接工厂对象
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
- // 从工厂中建立一个连接并开启(Connection)
- connection = connectionFactory.createConnection();
- connection.start();
- // 从连接中建立一个会话(Session)
- session = connection.createSession(false, 1);
- // 基于会话建立目的地(Topic)
- Topic topic = session.createTopic(TOPIC_NAME);
- // 基于会话创建消费者(Consumer)
- consumer = session.createConsumer(topic);
- // 接收消息的第一种方式, 阻塞式接收
- // Message message = consumer.receive();
- // System.out.println("consumer recive message =" + message);
- // 接收消息的第二种方式, 使用监听器
- consumer.setMessageListener(msg -> {
- TextMessage textMessage = (TextMessage) msg;
- try {
- System.out.println("textMessage =" + textMessage.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- });
- } catch (Exception ex) {
- throw new IllegalStateException(ex);
- }
- }
- }
- Consumer
Queue 模式和 Topic 模式, 代码十分相似, 一个是创建 Queue, 而另外一个是创建 Topic.
现在我们来运行三个 Consumer, 再运行 Producer, 来看看现象
控制台打印的信息中, 三个 Consumer 都消费了所有消息, 一条消息只能被多个 Consumer 消费.
五, SpringBoot 整合
来源: http://www.bubuko.com/infodetail-3277461.html