前言
RabbitMQ 虽然有对队列及消息等的一些持久化设置, 但其实光光只是这一个是不能够保障数据的可靠性的, 下面我们提出这样的质疑:
(1)RabbitMQ 生产者是不知道自己发布的消息是否已经正确达到服务器呢, 如果中间发生网络异常等情况呢? 消息必然会丢失!
(2)RabbitMQ 如果没有设置队列持久化, RabbitMQ 服务器重后队列的元数据会丢失, 消息自然也会丢失!
(3)RabbitMQ 如果消费者设置自动确认, 即 autoAck 为 true, 那么不管消费者发生什么情况, 该消息会自动从队列中移除, 实际上消费者有可能挂掉, 消息必然会丢失!
(4)RabbitMQ 中的消息如果没有匹配到队列时, 那么消息也会丢失!
本文其实也就是结合以上四个方面进行讲解的, 主要参考《RabbitMQ 实战指南》(有需要 PDF 电子书的可以评论或者私信我), 本文截图也来自其中, 另外可以对一些 RabbitMQ 的概念的认识可以参考我的上两篇博文认识 RabbitMQ 交换机模型, RabbitMQ 是如何运转的?
一, 设置 mandotory 参数, AE 备份交换器
针对前言中的第 (4) 个问题, 我们可以通过设置 mandotory 参数与 AE 备份交换器来解决
1,mandotory 参数
1)当为 true 时, 交换器无法根据自身的类型和路由键找到一个符合条件的队列, 此时 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者, 消息将不会丢失
2)当为 false 时, 消息将会被直接丢弃.
3)RabbitMQ 通过 addReturnListener 添加 ReturnLisener 监听器监听获取没有被正确路由到合适队列的消息.
- channel.basicPublish(EXCHANGE NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN,"mandatory test".getBytes());
- channel.addReturnListener(new ReturnListener(){
- public void handleReturn(int replyCode, String replyText,
- String exchange, String routingKey,
- AMQP.BasicProperties basicProperties,
- byte[] body) throws IOException {
- String message = new String(body);
- System.out.println("Basic.Return 返回的结果是:" + message);
- }
- });
2,AE 备份交换器
Alternate Exchange, 简称 AE, 不设置 mandatory 参数, 那么消息将会被丢失, 设置 mandatory 参数的话, 需要添加 ReturnListner 监听器, 增加复杂代码, 如果既不想增加代码又不想消息丢失, 则使用 AE, 将没有被路由的消息存储于 RabbitMQ 中. 当 mandatory 参数用 AE 一起使用时, mandatory 将失效. 在介绍 AE 之前, 也认识 RabbitMQ 对于消息的过期时间 TTL 设置以及队列的过期时间 TTL 设置
2.1 TTL 过期时间设置
可以对队列设置 TTL 与消息设置 TTL, 其中消息设置 TTL 经常用于死信队列, 延迟队列等高级应用中.
1)设置消息 TTL
设置 TTL 过期时间一般有两种当时: 一是通过队列属性, 对队列中所有消息设置相同的 TTL. 二就是对消息本身单独设置, 每条消息 TTL 不同. 如果一起使用时候, TTL 小的为准, 当一旦超过设置的 TTL 时间时, 就会变成 "死信".
方式一: 针对每条消息设置 TTL 是通过增加 expiration 的属性参数实现的, 不可能像方式二一样扫描整个队列再判断是否过期, 只有当该消息即将被消费时再判定是否过期即可删除, 也就是消息即使已经过期, 但不一定立马被删除!
- AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
- // 持久化消息
- builder deliveryMode(2);
- // 设置 TTL=60000ms
- builder expiration( 60000 );
- AMQP.BasicProperties properties = builder. build();
- channel.basicPublish(exchangeName, routingKey, mandatory, properties, "ttlTestMessage".getBytes());
方式二: 通过队列属性设置消息 TTL 是增加 x-message-ttl 参数实现的, 只需要扫描整个队列头部即可立即删除, 也就是消息一旦过期就会被删除!
- Map<String, Object> argss = new HashMap<String , Object>();
- argss.put("x-message-ttl", 6000);
- channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss) ;
2)设置队列 TTL
通过在队列中添加参数 x-message-ttl 参数实现, 设置队列被自动删除前处于未被使用状态的时间, 注意是队列的使用状态, 并不是消息是否被消费的状态
设置 ttl=30min 的队列, 时间一到 RabbitMQ 会保证队列被删除, 但是不会保证删除的速度有多快.
- Map<String, Object> args = new HashMap<String, Object>{
- );
- args.put("x-expires", 1800000);
- channel.queueDeclare("myqueue", false, false, false, args);
2.2 AE 备份交换器的使用
声明交换器的时候, 添加 alternate-exchange 参数实现, 或通过策略实现. 前者优先级高. 从代码角度需要以下三个步骤, 具体代码如下:
- Map<String, Object> args = new HashMap<String, Object>();
- args.put("a1ternate-exchange", "myAe");
- channe1.exchangeDec1are("norma1Exchange", "direct", true, fa1se, args);
- channe1.exchangeDec1are("myAe", "fanout", true, fa1se, nu11) ;
- channe1.queueDec1are( "norma1Queue", true, fa1se, fa1se, nu11);
- channe1.queueB nd("norma1Queue", "norma1Exchange", "norma1Key");
- channe1.queueDec1are("unroutedQueue", true, fa1se, fa1se, nu11);
1)声明 normalExchange 类型为 direct 的交换器, 类型为 fanout 的 myAe 备份交换器; 并且 normalExchange 的备份交换器为 myAe(备份交换器建议使用 fanout 类型交换器)
2)声明 normalQueue 队列, 声明 unrouteQueue 队列;
3)通过路由键 normalKey 绑定 normalExchange 与 normalQueue, 不适用路由键绑定 unrouteQueue 与 myAe
二, 消费者手动确认
针对前言中第 (3) 个问题, 我们需要在消费者消费完消息后手动进行确认, 保证消息数据不丢失!
1,autoAck 参数设置
1) 当 autoAck 参数为 false 时, 手动确认:
RabbitMQ 会等待消费者显式地回复确认信号后从内存中移去消息(实际上是先标示删除标记, 之后再删除), 这是一般推荐使用的方式, 因为使用手动确认有足够的时间处理消息, 不需要担心消费者进程挂掉之后消息丢失问题. 此时的消息就会分为两个部分: 一是等待投递给消费者的消息; 二是已经投递给消费者但还没有收到消费者确认信号的消息.
2) 当 autoAck 为 true 时, 自动确认:
RabbitMQ 会自动隐式地回复确认信号后从内存中移去消息, RabbitMQ 不需要管消费者是否真正消费了这些消息, RabbitMQ 会自动把发送出去的消息置为确认, 然后直接从内存中删除.
2, 重新投递
问: 如果选择手动确认, 即 autoAck 为 false 时, 消费者由于某些原因断开了, 那么消息的确认会受到影响, 那么此时的消息会丢失吗?
这也就是一开始提出来的问题, 其实是不必担心消息会被丢失, 因为 RabbitMQ 如果一直没收到消费者的确认信号, 并且消费此消息的消费者已经断开, 则 RabbitMQ 会重新安排消息进入队列等待给下一个消费者. 也就是 RabbitMQ 不会设置消息的过期时间(当然也可以设置过期时间, 但与之有关系方式消息丢失的特性是死信队列), 它只判断是否需要重新安排入队列重新投递, 而判断的唯一标准是消费此消息的消费者连接是否已经断开, 即 RabbitMQ 会允许消费一条消息的时间很久很久.
3, 消费者拒绝消息
1)使用 channel.basicReject 方法, 但只能拒绝一条.
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag: 消息的唯一标识
requeue: 表示是否可以拒绝的消息重新存入队列
2)使用 channel.basicNack. 不同于前者, 此方法可以批量拒绝.
void basicNack(long deliveryTag, boolean multiple , boolean requeue) throws IOException;
multiple: 设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息.
3)问: 关键在于, 消费者拒绝消费消息后怎么处理? 是丢弃, 还是重新回到队列呢?
当参数 requeue 设置为 true 时候, 可以重新进入队列, 投递给下一个消费者. 如果为 false, 消息就会把队列中消息立马移除, 再结合启用 "死信队列", 防止消息丢失并且可以分析异常情况的发生.
三, 生产者确认机制
针对前言的第 (1) 个问题, 我们可以通过生产者的确认消息机制来解决, 主要分为两种: 第一是事务机制, 第二是发送方确认机制
1, 事务机制
与事务机制相关的有三种方法, 分别是 channel.txSelect 设置当前信道为事务模式, channel.txCommit 提交事务和 channel.txRollback 事务回滚. 如果事务提交成功, 则消息一定是到达了 RabbitMQ 中, 如果事务提交之前由于发送异常或者其他原因, 捕获后可以进行 channel.txRollback 回滚.
- // 将信道设置为事务模式, 开启事务
- channel.txSelect();
- // 发送持久化消息
- channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, "transaction messages".getBytes());
- // 事务提交
- channel.txCommit();
发生异常之后事务回滚
- try {
- channel.txSelect();
- channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, "transaction messages".getBytes());
- channel.txCommit();
- } catch (Exception e){
- e.printStackTrace();
- channel.txRollback();
- }
2, 确认机制
确认机制相对来说, 相比较代码来说比较复杂. 主要有单条确认, 批量确认, 异步批量确认
----------------------------------------- 未完待续, 写不动了, 休息会再补充下一篇 -------------------------------------------
来源: https://www.cnblogs.com/jian0110/p/10419013.html