1. 前情回顾
RabbitMQ 使用教程(一)RabbitMQ 环境安装配置及 Hello World 示例
RabbitMQ 使用教程(二)RabbitMQ 用户管理, 角色管理及权限设置
RabbitMQ 使用教程 (三) 如何保证消息 99.99% 被发送成功?
在上一篇博客中, 我们讲解了如何通过 RabbitMQ 的生产者确认机制, 保证消息尽可能的成功的发送到 RabbitMQ 服务器, 这只是从源头降低了消息丢失的几率, 并没有真正解决之前提到的问题: 如何保证 RabbitMQ 异常情况 (人为重启, 异常宕机等) 下, 队列和消息不丢失?
2. 本篇概要
要解决该问题, 就要用到 RabbitMQ 中持久化的概念, 所谓持久化, 就是 RabbitMQ 会将内存中的数据 (Exchange 交换器, Queue 队列, Message 消息) 固化到磁盘, 以防异常情况发生时, 数据丢失.
其中, RabblitMQ 的持久化分为三个部分:
交换器 (Exchange) 的持久化
队列 (Queue) 的持久化
消息 (Message) 的持久化
3. 交换器 (Exchange) 的持久化
在上篇博客中, 我们声明 Exchange 的代码是这样的:
- private final static String EXCHANGE_NAME = "normal-confirm-exchange";
- // 创建一个 Exchange
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
这种情况下声明的 Exchange 是非持久化的, 在 RabbitMQ 出现异常情况 (重启, 宕机) 时, 该 Exchange 会丢失, 会影响后续的消息写入该 Exchange, 那么如何设置 Exchange 为持久化的呢? 答案是设置 durable 参数.
durable: 设置是否持久化. durable 设置为 true 表示持久化, 反之是非持久化.
持久化可以将交换器存盘, 在服务器重启的时候不会丢失相关信息.
设置 Exchange 持久化:
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
此时调用的重载方法为:
- public DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException {
- return this.exchangeDeclare(exchange, (String)type, durable, false, (Map)null);
- }
为了能更好的理解, 我们新建个生产类如下:
- package com.zwwhnly.springbootaction.rabbitmq.durable;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class DurableProducer {
- private final static String EXCHANGE_NAME = "durable-exchange";
- private final static String QUEUE_NAME = "durable-queue";
- public static void main(String[] args) throws IOException, TimeoutException {
- // 创建连接
- ConnectionFactory factory = new ConnectionFactory();
- // 设置 RabbitMQ 的主机名
- factory.setHost("localhost");
- // 创建一个连接
- Connection connection = factory.newConnection();
- // 创建一个通道
- Channel channel = connection.createChannel();
- // 创建一个 Exchange
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
- // 发送消息
- String message = "durable exchange test";
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
- // 关闭频道和连接
- channel.close();
- connection.close();
- }
- }
示例代码中, 我们新建了 1 个非持久化的 Exchange,1 个非持久化的 Queue, 并将它们做了绑定, 此时运行代码, Exchange 和 Queue 新建成功, 消息'durable exchange test'也被正确地投递到了队列中:
此时重启下 RabbitMQ 服务, 会发现 Exchange 丢失了:
修改下代码, 将 durable 参数设置为 ture:
- // 创建一个 Exchange
- channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
此时运行完代码, 然后重启下 RabbitMQ 服务, 会发现 Exchange 不再丢失:
4. 队列 (Queue) 的持久化
细心的网友可能会发现, 虽然现在重启 RabbitMQ 服务后, Exchange 不丢失了, 但是队列和消息丢失了, 那么如何解决队列不丢失呢? 答案也是设置 durable 参数.
durable: 设置是否持久化. 为 true 则设置队列为持久化.
持久化的队列会存盘, 在服务器重启的时候可以保证不丢失相关信息.
简单修改下上面声明 Queue 的代码, 将 durable 参数设置为 true:
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
此时调用的重载方法如下:
- public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
- validateQueueNameLength(queue);
- return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();
- }
运行代码, 然后重启 RabbitMQ 服务, 会发现队列现在不丢失了:
5. 消息 (Message) 的持久化
虽然现在 RabbitMQ 重启后, Exchange 和 Queue 都不丢失了, 但是存储在 Queue 里的消息却仍然会丢失, 那么如何保证消息不丢失呢? 答案是设置消息的投递模式为 2, 即代表持久化.
修改发送消息的代码为:
- // 发送消息
- String message = "durable exchange test";
- AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
- channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
调用的重载方法为:
- public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
- this.basicPublish(exchange, routingKey, false, props, body);
- }
运行代码, 然后重启 RabbitMQ 服务, 发现此时 Exchange,Queue, 消息都不丢失了:
至此, 我们完美的解决了 RabbitMQ 重启后, 消息丢失的问题.
最终的代码如下, 你也可以通过文末的源码链接下载本文用到的所有源码:
- package com.zwwhnly.springbootaction.rabbitmq.durable;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class DurableProducer {
- private final static String EXCHANGE_NAME = "durable-exchange";
- private final static String QUEUE_NAME = "durable-queue";
- public static void main(String[] args) throws IOException, TimeoutException {
- // 创建连接
- ConnectionFactory factory = new ConnectionFactory();
- // 设置 RabbitMQ 的主机名
- factory.setHost("localhost");
- // 创建一个连接
- Connection connection = factory.newConnection();
- // 创建一个通道
- Channel channel = connection.createChannel();
- // 创建一个 Exchange
- channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
- // 发送消息
- String message = "durable exchange test";
- AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
- channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
- // 关闭频道和连接
- channel.close();
- connection.close();
- }
- }
6. 注意事项
1)理论上可以将所有的消息都设置为持久化, 但是这样会严重影响 RabbitMQ 的性能. 因为写入磁盘的速度比写入内存的速度慢得不止一点点. 对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量. 在选择是否要将消息持久化时, 需要在可靠性和吞吐量之间做一个权衡.
2)将交换器, 队列, 消息都设置了持久化之后仍然不能百分之百保证数据不丢失, 因为当持久化的消息正确存入 RabbitMQ 之后, 还需要一段时间 (虽然很短, 但是不可忽视) 才能存入磁盘之中. 如果在这段时间内 RabbitMQ 服务节点发生了宕机, 重启等异常情况, 消息还没来得及落盘, 那么这些消息将会丢失.
3)单单只设置队列持久化, 重启之后消息会丢失; 单单只设置消息的持久化, 重启之后队列消失, 继而消息也丢失. 单单设置消息持久化而不设置队列的持久化显得毫无意义.
7. 源码
源码地址: https://github.com/zwwhnly/springboot-action.git , 欢迎下载.
8. 参考
《RabbitMQ 实战指南》
来源: https://www.cnblogs.com/zwwhnly/p/10948024.html