1. 概述
在消息代理(如 RabbitMQ), 由于发送消息的 AMQP 协议方法不能保证消息一定到达对方或被成功处理, 所以发布者和消费者都需要一个交付和处理确认的机制 在上一篇文章中间件系列十 RabbitMQ 之消费者端的消息确认机制, 我们介绍了消费端的消息确认机制本篇我们介绍发送端的消息确认机制
RabbitMQ 在收到消息后, 还需要有一段时间才能将消息存入磁盘之中 RabbitMQ 并不是为每条消息都做 fsync 的处理, 可能仅仅保存到 cache 中而不是物理磁盘上, 在这段时间内 RabbitMQ broker 发生 crash, 消息保存到 cache 但是还没来得及落盘, 那么这些消息将会丢失为了解决这个问题 RabbitMQ 引入发送端消息确认机制, 主要通过事务和 publisher Confirm 机制
本篇的主要内容如下:
1. 通过 AMQP 事务和 publisher Confirm 机制保证发送端的消息不丢失
2. 演示 RabbitMQ 中的事务用法, 并通过抓包分析协议, 最后说明事务的事务的优点和缺点
3. 演示 RabbitMQ 中 Publisher Confirm 模式的用法, 并对以下 3 种方式通过抓包分析协议并说明其缺点和优点
- 同步方式的发送端的单个 Publisher Confirm 模式
- 同步方式的发送端的批量 Publisher Confirm 机制
- 异步方式的发送端的 Publisher Confirm 机制
4. 其他发送者端的注意事项: 否定确认消息确认的时机持久化消息的确认延迟发送顺序
2. 事务机制保证消息不丢失
RabbitMQ 支持事务(transaction), 通过调用 tx.select 方法开启事务模式当开启了事务模式后, 只有当一个消息被所有的镜像队列保存完毕后, RabbitMQ 才会调用 tx.commit-ok 返回给客户端
2.1. 代码
工程名称: rabbitmq
发送端的关键代码 TransactionalSend: 通过 channel.txSelect()开启事务, 发送消息, 最后执行 channel.txCommit()提交事务如果发送失败, 则使用 channel.txRollback()回滚事务
- // 开启事务
- channel.txSelect();
- // 发送消息
- while(num--> 0) {
- // 发送一个持久化消息到特定的交换机
- channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
- System.out.println("[TransactionalSend] Sent + [" + num + "]'" + message + "'");
- }
- // 不注解下面语句, 可以进入 channel.txRollback()逻辑
- // if(true){
- // throw new IOException("consumer channel.txRollback()");
- // }
- // 提交事务
- channel.txCommit();
- }catch(IOException e){
- e.printStackTrace();
- // 回滚事务
- channel.txRollback();
- }
测试代码: 使用 PublisherConfirmTest 的方法进行测试
2.2. 使用 wireshark 抓包分析正常的事务提交流程
使用 wireshark 截获以上测试时产生的包:
所有的包:
全部包如下:
496-497 帧: 开启事务
496 帧: Tx.Select 客户端向 RabbitMQ 要求开启事务
497 帧: Tx.Select-Ok 服务端处理事务请求并返回成功结果
498 帧: 发送消息, 和普通发送消息同, 这里略
499-502 帧: 提交事务
499 帧: Tx.Commit 客户端向 RabbitMQ 要求开启事务
502 帧: Tx.Commit-Ok 服务端处理事务提交并返回成功结果
2.3. 使用 wireshark 抓包分析执行事务失败, 自动回滚
使用 wireshark 截获以上测试时产生的包:
如果事务执行失败, 则会执行 tx.rollback 执行回滚操作
由于包信息比较简单, 这里略, 只上截图
2.4. 事务的优点和缺点
事务的实现简单, 能够保证消息正确到达 RabbitMQ, 但是它的效率低, 只有一般发送消息的效率的 1/250
3. publisher confirms 机制保证消息不丢失
在 AMPQ-0-9-1 中, 有定义从消费者到 RabbitMQ 的处理确认机制但是没有定义消息代理到生产者的确认机制, 在 RabbitMQ 中对此进行扩展, 叫做 publisher confirms 机制
在标准的 AMQP 0-9-1, 保证消息不会丢失的唯一方法是使用事务: 在通道上开启事务, 发布消息, 提交事务但是事务是非常重量级的, 它使得 RabbitMQ 的吞吐量降低 250 倍为了解决这个问题, RabbitMQ 引入的 Publisher Confirms 机制, 它是模仿 AMQP 协议中消费者消息确认机制
生产者端可以通过 confirm.select 来启用方法 Publisher Confirms 机制, RabbitMQ 服务端根据是否设置 no-wait 的值, 返回 confirm.select-ok 一旦在通道上使用 confirm.select 方法, 就认为它处于 Publisher Confirms 模式事务通道不能进入 Publisher Confirms 模式, 一旦通道处于 Publisher Confirms 模式, 不能开启事务即事务和 Publisher Confirms 模式只能二选一
Publisher Confirm 模式有以下几种使用方式:
同步方式的发送端的单个 Publisher Confirm 模式
同步方式的发送端的批量 Publisher Confirm 机制
异步方式的发送端的 Publisher Confirm 机制
4. 同步方式的发送端的 Publisher Confirm 模式
4.1. 测试代码
测试工程名称: rabbitmq
关键代码: SimpleConfirmSend
这个代码实现发送者端发送一个持久化消息到特定的交换机, 然后等待服务端返回 Basic.Ack 后, 才执行发送消息
- while(num--> 0) {
- // 发送一个持久化消息到特定的交换机
- channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
- System.out.println("[SimpleConfirmSend] Sent'" + message + "'");
- // 等待服务端返回 Basic.Ack 后, 才执行下一个循环
- if(!channel.waitForConfirms()){
- System.out.println("message haven't arrived broker");
- // 在这里可以对发送失败的记录进行处理: 如重发
- }
- }
4.2. 使用 wireshark 抓包单个确认消息确认
这里我们发送端只发送一个消息, 并进行抓包, 分析抓到的包, 详细如下:
和普通发送最大的不同是, 在执行发送消息前执行 Confirm.Select,RabbitMQ 在消息已经收到并处理完毕 (如果消息需要, 则持久化消息后, 才返回 Basic.Ok; 如果对应消息的镜像队列, 则队列完全同步后, 才返回 Basic.Ok 总之, 必须保证消息不会因为 RabbitMQ 异常丢失) 后返回 Basic.Ok 给客户端
110 -111 帧:
110 帧 Confirm.Select: 客户端请求开启 Confirm 模式
111 帧 Confirm.Select-Ok: 服务端执行完毕, 并返回成功结果
112 帧: 发送消息
这里我们发现, 这个包里没有 Delivery-Tag 值, 但是后面 RabbitMQ 回送的消息包里有 Delivery-Tag 如果我们一次发送大量的消息, 即使 RabbitMQ 对收到前 N 条消息进行确认, 发送者端也不知道 RabbitMQ 是不是收到发送者端最先发送的 N 条记录, 因为消息到达 RabbbitMQ 是无序的只有 RabbitMQ 对本次发送的所有记录进行确认, 我们才知道消息全部发送成功如果不小心丢失一条消息, 我们是不知道那条记录丢失的, 唯一的办法是本次所有记录重发
113 帧: Basic.Ack
113 帧 RabbitMQ 服务端在处理完毕消息后, 回送 Basic.Ack 这里表示服务端已经收到 delivery-Tag=1 的数据
4.3. 使用 wireshark 抓包发送多个消息抓包分析
这里我们发送端发送 N 个消息, 并进行抓包, 分析抓到的包, 详细如下:
这里和发送单个消息类似, 都是 Basic.Publish 和 Basic.Ack 对最大的不同是 Basic.Ack 消息中 Delivery-Tag 值随着消息数量递增
5. 同步方式的发送端的批量 Publisher Confirm 机制
之前我们是发送一个消息确认一个消息本节我们发送 N 条记录后, 再进行确认
测试工程名称: rabbitmq
关键代码:
- TransactionalSend:
- // ======== 批量确认模式 end ======
- // 发送消息
- while(num--> 0) {
- // 发送一个持久化消息到特定的交换机
- channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
- System.out.println("[SimpleConfirmSend] Sent'" + message + "'");
- }
- // 批量等待确认: 返回 true: 如果所有的消息都收到有确认应答, 没有消息被拒绝
- if(!channel.waitForConfirms()){
- System.out.println("Not all message have arrived broker");
- // 实际应用中, 需要在这是添加发送消息失败的处理逻辑: 如重发等等
- // 在这种的模式中, 如果发送 N 条消息, 如果有一条失败, 则所有的消息都需要重新推送
- }
5.1. 使用 wireshark 抓包分析批量确认机制
这里我们发送端连续发送 10 个消息, 然后再 basic.Ack 进行确认, 再进行抓包, 分析抓到的包, 详细如下
79 - 89 帧 : 连续发送 10 个请求
91-92 帧: 对消息进行应答, 虽然有 10 个消息, 但是为了效率 RabbitMQ 不是对每个包发送一个确认包, 而是进行批量确认
91 帧:
Basic.ACK 对收到的前 5 条已经处理完毕的消息进行确认
Basic.Ack 对收到的前 10 条已经处理完毕的消息进行确认
备注:
这里我们发现, Basic.Ack 的 Delivery-Tag 的值是 RabbitMQ 服务端生成的, 客户端在发送消息的没有这个值, 这里不能保证 Delivery-Tag 值为 1, 是对客户端发送的第一条记录进行确认如 91 帧是 RabbitMQ 对最新处理完毕的前 5 条记录进行应答, 但是不能保证 100% 是对客户端发送的前 5 条记录进行应答 RabbitMQ 可能先处理完 1,2,3,4,6 条记录, 再发送这条确认信息
所以假如我们第二条的 Delivery-Tag 值为 8 且以后不在收到新的 Basic.ACK, 那么表示 2 条消息丢失, 但是我们无法确认 100% 确认丢失的消息是第 9,10 条, 此时我们需要重发这里的全部 10 条记录
6. 异步 Publisher Confirm 机制
测试工程名称: rabbitmq
关键代码 AsynConfirmSend: 通过回调方法进行处理
- // 添加回调对象, 处理返回值
- channel.addConfirmListener(new ConfirmListener(){
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("[AsynConfirmSend] handleAck : deliveryTag =" + deliveryTag + "multiple =" + multiple);
- }
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("[AsynConfirmSend] handleNack : deliveryTag =" + deliveryTag + "multiple =" + multiple);
- }
- });
- // 开启 confirm 模式:
- channel.confirmSelect();
- // 发送消息
- while(num--> 0) {
- // 发送一个持久化消息到特定的交换机
- channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
- System.out.println("[TransactionalSend] Sent'" + message + "'");
- }
这里和上面的同步批量 confirm 的最大不同时, 发送端在发送消息时, 另一个线程同步进行消息的确认使用此接口连续发送 10 个消息, 包的信令和同步批量 confirm 是几乎相同的, 在信令上两者没有本质区别
7. 使用 Publisher Confirms 的其它注意事项
7.1. 否定确认和重新入队
当 RabbitMQ 无法成功的处理消息时, 它会返回生产者端 basic.nack, 而不是 basic.ack 在这种情况下, basic.nack 的字段与 basic.ack 相对应的字段意义相同, 且 requeue 字段是没有意义的是否重发消息由发送者端自己决定
当进入通道进入 Publisher Confirms 模式, 所有的消息只能被 confirmed 确认或者 nack 一次, 每个消息只能同时被 confirmed 和 nack 另外没有机制保证消息需要多久被 confirmed
basic.nack 只有 Erlange 进程在处理队列时发生内部错误时才会被回送
当消息需要重新入队重发时, 如果可能的话, 它将被放置在其原来的队列中的原来位置如果不能(由于当多个消费者共享一个队列时, 其他消费者的也会并发递送和确认消息), 则该消息将被重新排队到靠近排队头的位置
7.2. 消息确认的时机
对于不可路由的消息(即 RabbitMQ 发现收到的消息不能被路由到任务队列), 有两种情况:
a. 消息的 mandatory=false,RabbitMQ 发现消息不可路由后, 马上确认消息, 即发送 basic.ack 或 baisc.nack 给生产者
b. 消息的 mandatory=true,RabbitMQ 发现消息可路由后, 先发送 basic.return, 再确认消息, 即发送 basic.ack 或 baisc.nack 给生产者
对于可路由的消息, 需要同时满足如下所有的条件才可以回送确认消息
1 消息被路由到所有的队列中
2 对于路由到持久队列的持久消息, 需要持久化消息到磁盘
3 如果队列是镜像队列, 则需要将消息同步到所有的队列中
7.3. 持久化消息的确认延迟
持久化的消息路由到持久化队列时, RabbitMQ 会将消息存储到磁盘空间为了保证持久化效率, RabbitMQ 不是来一条存一条, 而是定时批量地持久化消息到磁盘, 这个时间间隔通常是几百毫秒, 或者队列空闲执行消息持久化如果队列支持镜像队列, 则延迟时间更大如果生产者每发送一条消息, 等待 basic.ack 来了再发送一条消息, 则等待时间可以达到几百毫秒
为了提高吞吐量, RabbitMQ 强烈建议应用程序异步处理确认或批量发送消息后再等待未完成的确认
7.4. Publisher Confirms 的顺序
在大多数情况下, RabbitMQ 将以发布的顺序向发布者确认消息 (这适用于发布在单个通道上的消息) 但是, 发布者确认是异步发送的, 并且可以确认一条消息或一组消息 发出确认的确切时间取决于消息的传递模式(持久性与非持久性) 以及消息被路由到的队列的属性所以 RabbitMQ 可能不以发布的顺序向发布者发送确认消息生产者端尽量不要依赖消息确认的顺序做服务
8. 代码
所有的详细代码见 github 代码, 请尽量使用 tag v0.16, 不要使用 master, 因为 master 一直在变, 不能保证文章中代码和 github 上的代码一直相同
来源: http://blog.csdn.net/hry2015/article/details/79462887