我们都知道, 消息从生产端到消费端消费要经过 3 个步骤:
生产端发送消息到 RabbitMQ;
RabbitMQ 发送消息到消费端;
消费端消费这条消息;
这 3 个步骤中的每一步都有可能导致消息丢失, 消息丢失不可怕, 可怕的是丢失了我们还不知道, 所以要有一些措施来保证系统的可靠性. 这里的可靠并不是一定就 100% 不丢失了, 磁盘损坏, 机房爆炸等等都能导致数据丢失, 当然这种都是极小概率发生, 能做到 99.999999% 消息不丢失, 就是可靠的了. 下面来具体分析一下问题以及解决方案.
生产端可靠性投递
生产端可靠性投递, 即生产端要确保将消息正确投递到 RabbitMQ 中. 生产端投递的消息丢失的原因有很多, 比如消息在网络传输的过程中发生网络故障消息丢失, 或者消息投递到 RabbitMQ 时 RabbitMQ 挂了, 那消息也可能丢失, 而我们根本不知道发生了什么. 针对以上情况, RabbitMQ 本身提供了一些机制.
事务消息机制
事务消息机制由于会严重降低性能, 所以一般不采用这种方法, 我就不介绍了, 而采用另一种轻量级的解决方案 --confirm 消息确认机制.
confirm 消息确认机制
什么是 confirm 消息确认机制? 顾名思义, 就是生产端投递的消息一旦投递到 RabbitMQ 后, RabbitMQ 就会发送一个确认消息给生产端, 让生产端知道我已经收到消息了, 否则这条消息就可能已经丢失了, 需要生产端重新发送消息了.
通过下面这句代码来开启确认模式:
channel.confirmSelect();// 开启发送方确认模式
然后异步监听确认和未确认的消息:
- channel.addConfirmListener(new ConfirmListener() {
- // 消息正确到达 broker
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("已收到消息");
- // 做一些其他处理
- }
- //RabbitMQ 因为自身内部错误导致消息丢失, 就会发送一条 nack 消息
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("未确认消息, 标识:" + deliveryTag);
- // 做一些其他处理, 比如消息重发等
- }
- });
这样就可以让生产端感知到消息是否投递到 RabbitMQ 中了, 当然这样还不够, 稍后我会说一下极端情况.
消息持久化
那消息持久化呢? 我们知道, RabbitMQ 收到消息后将这个消息暂时存在了内存中, 那这就会有个问题, 如果 RabbitMQ 挂了, 那重启后数据就丢失了, 所以相关的数据应该持久化到硬盘中, 这样就算 RabbitMQ 重启后也可以到硬盘中取数据恢复. 那如何持久化呢?
message 消息到达 RabbitMQ 后先是到 exchange 交换机中, 然后路由给 queue 队列, 最后发送给消费端.
所有需要给 exchange,queue 和 message 都进行持久化:
exchange 持久化:
- // 第三个参数 true 表示这个 exchange 持久化
- channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
queue 持久化:
- // 第二个参数 true 表示这个 queue 持久化
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
message 持久化:
- // 第三个参数 MessageProperties.PERSISTENT_TEXT_PLAIN 表示这条消息持久化
- channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
这样, 如果 RabbitMQ 收到消息后挂了, 重启后会自行恢复消息.
到此, RabbitMQ 提供的几种机制都介绍完了, 但这样还不足以保证消息可靠性投递 RabbitMQ 中, 上面我也提到了会有极端情况, 比如 RabbitMQ 收到消息还没来得及将消息持久化到硬盘时, RabbitMQ 挂了, 这样消息还是丢失了, 或者 RabbitMQ 在发送确认消息给生产端的过程中, 由于网络故障而导致生产端没有收到确认消息, 这样生产端就不知道 RabbitMQ 到底有没有收到消息, 就不好做接下来的处理.
所以除了 RabbitMQ 提供的一些机制外, 我们自己也要做一些消息补偿机制, 以应对一些极端情况. 接下来我就介绍其中的一种解决方案 -- 消息入库.
消息入库
消息入库, 顾名思义就是将要发送的消息保存到数据库中.
首先发送消息前先将消息保存到数据库中, 有一个状态字段 status=0, 表示生产端将消息发送给了 RabbitMQ 但还没收到确认; 在生产端收到确认后将 status 设为 1, 表示 RabbitMQ 已收到消息. 这里有可能会出现上面说的两种情况, 所以生产端这边开一个定时器, 定时检索消息表, 将 status=0 并且超过固定时间后 (可能消息刚发出去还没来得及确认这边定时器刚好检索到这条 status=0 的消息, 所以给个时间) 还没收到确认的消息取出重发(第二种情况下这里会造成消息重复, 消费者端要做幂等性), 可能重发还会失败, 所以可以做一个最大重发次数, 超过就做另外的处理.
这样消息就可以可靠性投递到 RabbitMQ 中了, 而生产端也可以感知到了.
消费端消息不丢失
既然已经可以让生产端 100% 可靠性投递到 RabbitMQ 了, 那接下来就改看看消费端的了, 如何让消费端不丢失消息.
默认情况下, 以下 3 种情况会导致消息丢失:
在 RabbitMQ 将消息发出后, 消费端还没接收到消息之前, 发生网络故障, 消费端与 RabbitMQ 断开连接, 此时消息会丢失;
在 RabbitMQ 将消息发出后, 消费端还没接收到消息之前, 消费端挂了, 此时消息会丢失;
消费端正确接收到消息, 但在处理消息的过程中发生异常或宕机了, 消息也会丢失.
其实, 上述 3 中情况导致消息丢失归根结底是因为 RabbitMQ 的自动 ack 机制, 即默认 RabbitMQ 在消息发出后就立即将这条消息删除, 而不管消费端是否接收到, 是否处理完, 导致消费端消息丢失时 RabbitMQ 自己又没有这条消息了.
所以就需要将自动 ack 机制改为手动 ack 机制.
消费端手动确认消息:
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- try {
- // 接收到消息, 做处理
- // 手动确认
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- } catch (Exception e) {
- // 出错处理, 这里可以让消息重回队列重新发送或直接丢弃消息
- }
- };
- // 第二个参数 autoAck 设为 false 表示关闭自动确认机制, 需手动确认
- channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
- });
这样, 当 autoAck 参数置为 false, 对于 RabbitMQ 服务端而言, 队列中的消息分成了两个部分: 一部分是等待投递给消费端的消息; 一部分是已经投递给消费端, 但是还没有收到消费端确认信号的消息. 如果 RabbitMQ 一直没有收到消费端的确认信号, 并且消费此消息的消费端已经断开连接或宕机(RabbitMQ 会自己感知到), 则 RabbitMQ 会安排该消息重新进入队列(放在队列头部), 等待投递给下一个消费者, 当然也有能还是原来的那个消费端, 当然消费端也需要确保幂等性.
好了, 到此从生产端到 RabbitMQ 再到消费端的全链路, 就可以保证数据的不丢失.
由于个人水平有限, 有些地方可能理解错了或理解不到位的, 请大家多多指出! Thanks
来源: http://www.bubuko.com/infodetail-2987718.html