本章重点:
可靠性投递
1. 确保消息发送到 RabbitMQ 服务器
2. 确保消息被正确的路由
3. 确保消息在队列正确地存储
4. 确保消息从队列正确地投递到消费者
5. 消费者回调
6. 补偿机制
7. 消息幂等性
8. 消息的顺序性
可靠性投递
首先需要明确, 效率和可靠性是无法兼得的, 如果要保证每一个环节都成功, 势必会对消息的收发效率造成影响, 如过是一些业务实时性要求不是特别高的场合, 可以牺牲可靠性来换取效率.
1代表消息从生产者发送到 Exchange
2代表消息从 Exchange 路由到 Queue
3 代表消息在 Queue 中存储;
4 代表消费者订阅 Queue 并消费消息.
1. 确保消息发送到 RabbitMQ 服务器
可能因为网络或者 Broker 的问题导致1失败, 而生产者是无法得知消息是否正确发送到 Broker 的.
有两种解决方案:
第一种是 Transaction 事务模式
第二种是 Confirm 确认模式
1. 在通过 channel.txSelect 方法开启事务之后, 我们便可以发布消息给 RabbitMQ 了, 如果事务提交成功, 则消息一定 到达了 RabbitMQ 中, 如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常, 这个时候我们便可以将其捕获, 进而通过执行 channel.txRollback 方法来实现事务回滚. 使用事务机制的话会 "吸干"RabbitMQ 的性 能, 一般不建议使用.
2. 生产者通过调用 channel.confirmSelect 方法 (即 Confirm.Select 命令) 将信道设置为 confirm 模式. 一旦消息被投递到所有匹配的队列之后, RabbitMQ 就会发送一个确认 (Basic.Ack) 给生产者(包含消息的唯一 ID), 这就使得生产者知晓消息已经正确到达了目的地了.
2. 确保消息被正确的路由
可能因为路由关键字错误, 或者队列不存在, 或者队列名称错误导致2失败.
使用 mandatory 参数和 ReturnListener, 可以实现消息无法路由的时候返回给生产者.
另一种方式就是使用备份交换机(alternate-exchange), 无法路由的消息会发送到这个交换机上.
- Map<String,Object> arguments = new HashMap<String,Object>();
- // 指定交换机的备份交换机
- arguments.put("alternate-exchange","ALTERNATE_EXCHANGE");
- channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);
3. 确保消息在队列正确地存储
可能因为系统宕机, 重启, 关闭等等情况导致存储在队列的消息丢失, 即3出现问题.
解决方案:
1. 队列持久化
- // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
2. 交换机持久化
- // String exchange, boolean durable
- channel.exchangeDeclare("MY_EXCHANGE","true");
3. 消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties .Builder() // 2 代表持久化, 其他代表瞬态 .deliveryMode(2) .build(); channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
4. 确保消息从队列正确地投递到消费者
如果消费者收到消息后未来得及处理即发生异常, 或者处理过程中发生异常, 会导致4失败.
为了保证消息从队列可靠性到达消费者, RabbitMQ 提供了消息确认机制(message acknowledgement), 消费者在订阅队列时, 可以指定 autoAck 参数, 当 autoAck 等于 false 时, RabbitMQ 会等待消费者显示地回复确认消息才从队列中删除该消息.
如果消息消费失败, 也可以调用 Basic.Reject 或者 BasicNack 来拒绝当前消息而不是确认, 如果 requere 参数为 true, 可以把这条消息重新存入队列, 以便发送给下一个消费者.
5. 消费者回调
消费者处理消息之后, 可以再发送一条消息给生产者, 或者调用生产者地 API, 告知消息处理完毕.
6. 补偿机制
对于一定时间没有响应地消息, 可以设置一个定时重发地机制, 但是要控制次数, 比如最多重复三次, 否则会造成消息堆积.
7. 消息幂等性
服务端是没有这种控制的, 只能在消费端控制.
如何避免消息的重复消费?
消息重复消费可能会有两个原因:
生产者的问题. 环节1重复发送消息, 比如在开启 Confirm 模式但未收到确认
环节4出了问题, 由于消费者未发送 ACK 或者其它原因, 消息重复投递
对于重复发送的消息, 可以对每一条消息生成一个唯一的业务 id, 通过日志或者建表来做重复控制.
8. 消息的顺序性
消息的顺序性是指消费者消费消息的顺序跟生产者投递消息的顺序是一致的.
在 RabbitMQ 中, 一个队列有多个消费者时, 由于不同的消费者消费消息的速度是不一样的, 顺序无法保证
读者分享
觉得不错的朋友可以点点左下角的拇指小赞一下, 同时在这给大家分享一些免费的架构资料 (包括 视频, 课件, 面试专题, 学习笔记等) 关注官方微信公众号, 那里每天都会有技术干货, 技术动向, 职业生涯, 行业热点, 职场趣事等一切有关于程序员的内容分享. 更有海量 Java 架构, 移动互联网架构相关源码视频, 面试资料, 电子书籍截止于 4 月 28 日免费发放. 学习资源丰富实用, 有需要的朋友可以来关注, 扫描下方二维码关注 wx 公众号免费获取↓↓↓
资源大本营↓↓↓
Java 架构资料
Java 源码解析, 到各种框架学习, 再到项目实战, 一应俱全, 包括但不限于: Spring,Mybatis 等源码, Java 进阶, Java 架构师, 虚拟机, 性能优化, 并发编程, 数据结构和算法.
移动互联网架构资料
Android 进阶, Android 架构, App 开发, NDK 模块开发, 小程序开发, 微信开发.
来源: http://www.jianshu.com/p/84e3f3b1bea5