1. 前情回顾
RabbitMQ 使用教程(一)RabbitMQ 环境安装配置及 Hello World 示例
RabbitMQ 使用教程(二)RabbitMQ 用户管理, 角色管理及权限设置
RabbitMQ 使用教程 (三) 如何保证消息 99.99% 被发送成功?
RabbitMQ 使用教程 (四) 如何通过持久化保证消息 99.99% 不丢失?
截止目前, 我们能够保证消息成功地被生产者发送到 RabbitMQ 服务器, 也能保证 RabbitMQ 服务器发生异常 (重启, 宕机等) 后消息不会丢失, 也许你认为现在消息应该很安全了吧? 其实还不够安全, 不信你接着往下看.
2. 本篇概要
其实, 还有 1 种场景需要考虑: 当消费者接收到消息后, 还没处理完业务逻辑, 消费者挂掉了, 那消息也算丢失了?, 比如用户下单, 订单中心发送了 1 个消息到 RabbitMQ 里的队列, 积分中心收到这个消息, 准备给这个下单的用户增加 20 积分, 但积分还没增加成功呢, 积分中心自己挂掉了, 导致数据出现问题.
那么如何解决这种问题呢?
为了保证消息被消费者成功的消费, RabbitMQ 提供了消息确认机制(message acknowledgement), 本文主要讲解 RabbitMQ 中, 如何使用消息确认机制来保证消息被消费者成功的消费, 避免因为消费者突然宕机而引起的消息丢失.
3. 开启显式 Ack 模式
在第 1 篇博客 RabbitMQ 使用教程(一)RabbitMQ 环境安装配置及 Hello World 示例中, 我们开启一个消费者的代码是这样的:
- // 创建队列消费者
- com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("Received Message'" + message + "'");
- }
- };
- channel.basicConsume(QUEUE_NAME, true, consumer);
这里的重点是 channel.basicConsume(QUEUE_NAME, true, consumer); 方法的第 2 个参数, 让我们先看下 basicConsume()的源码:
- public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
- return this.basicConsume(queue, autoAck, "", callback);
- }
这里的 autoAck 参数指的是是否自动确认, 如果设置为 ture,RabbitMQ 会自动把发送出去的消息置为确认, 然后从内存 (或者磁盘) 中删除, 而不管消费者接收到消息是否处理成功; 如果设置为 false,RabbitMQ 会等待消费者显式的回复确认信号后才会从内存 (或者磁盘) 中删除.
建议将 autoAck 设置为 false, 这样消费者就有足够的时间处理消息, 不用担心处理消息过程中消费者宕机造成消息丢失.
此时, 队列里的消息就分成了 2 个部分:
等待投递给消费者的消息(下图中的 Ready 部分)
已经投递给消费者, 但是还没有收到消费者确认信号的消息(下图中的 Unacked 部分)
如果 RabbitMQ 一直没有收到消费者的确认信号, 并且消费此消息的消费者已经断开连接, 则 RabbitMQ 会安排该消息重新进入队列, 等待投递给下一个消费者, 当然也有可能还是原来的那个消费者.
RabbitMQ 不会为未确认的消息设置过期时间, 它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开, 这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久.
为了便于理解, 我们举个具体的例子, 生产者的话的我们延用上文中的 DurableProducer:
- package com.zwwhnly.springbootaction.rabbitmq.durable;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class DurableProducer {
- private final static String EXCHANGE_NAME = "durable-exchange";
- private final static String QUEUE_NAME = "durable-queue";
- public static void main(String[] args) throws IOException, TimeoutException {
- // 创建连接
- ConnectionFactory factory = new ConnectionFactory();
- // 设置 RabbitMQ 的主机名
- factory.setHost("localhost");
- // 创建一个连接
- Connection connection = factory.newConnection();
- // 创建一个通道
- Channel channel = connection.createChannel();
- // 创建一个 Exchange
- channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
- // 发送消息
- String message = "durable exchange test";
- AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
- channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
- // 关闭频道和连接
- channel.close();
- connection.close();
- }
- }
然后新建一个消费者 AckConsumer 类:
- package com.zwwhnly.springbootaction.rabbitmq.ack;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class AckConsumer {
- private final static String QUEUE_NAME = "durable-queue";
- public static void main(String[] args) throws IOException, TimeoutException {
- // 创建连接
- ConnectionFactory factory = new ConnectionFactory();
- // 设置 RabbitMQ 的主机名
- factory.setHost("localhost");
- // 创建一个连接
- Connection connection = factory.newConnection();
- // 创建一个通道
- Channel channel = connection.createChannel();
- // 创建队列消费者
- com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- int result = 1 / 0;
- System.out.println("Received Message'" + message + "'");
- }
- };
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
- }
我们先将 autoAck 参数设置为 ture, 即自动确认, 并在消费消息时故意写个异常, 然后先运行生产者客户端将消息写入队列中, 然后运行消费者客户端, 发现消息未消费成功但是却消失了:
然后我们将 autoAck 设置为 false:
channel.basicConsume(QUEUE_NAME, false, consumer);
再次运行生产者客户端将消息写入队列中, 然后运行消费者客户端, 此时虽然消费者客户端仍然代码异常, 但是消息仍然在队列中:
然后我们删除掉消费者客户端中的异常代码, 重新启动消费者客户端, 发现消息消费成功了, 但是消息一直未 Ack:
手动停掉消费者客户端, 发现消息又到了 Ready 状态, 准备重新投递:
之所以消费掉消息, 却一直还是 Unacked 状态, 是因为我们没在代码中添加显式的 Ack 代码:
- String message = new String(body, "UTF-8");
- //int result = 1 / 0;
- System.out.println("Received Message'" + message + "'");
- long deliveryTag = envelope.getDeliveryTag();
- channel.basicAck(deliveryTag, false);
deliveryTag 可以看做消息的编号, 它是一个 64 位的长整形值.
此时运行消费者客户端, 发现消息消费成功, 并且在队列中被移除:
4. 源码
源码地址: https://github.com/zwwhnly/springboot-action.git , 欢迎下载.
5. 参考
《RabbitMQ 实战指南》
来源: https://www.cnblogs.com/zwwhnly/p/10953388.html