一: 确认种类
RabbitMQ 的消息确认有两种.
一种是消息发送确认. 这种是用来确认生产者将消息发送给交换器, 交换器传递给队列的过程中, 消息是否成功投递. 发送确认分为两步, 一是确认是否到达交换器, 二是确认是否到达队列.
第二种是消费接收确认. 这种是确认消费者是否成功消费了队列中的消息.
具体建议参考:
这里我们重点研究下接收确认的情况.
为了保证消息从队列可靠的到达消费者, RabbitMQ 提供了消息 确认机制, 消费者在订阅队列的时候, 可以指定 autoAck 参数, 当 autoACK 等于 false 时, RabbitMQ 会等待显示的恢复确认信号之后才从内存或者磁盘中移除消息 (实质上是先打上删除标记, 之后再删除)
当 autoAck 属性为 true 的时候, RabbitMQ 会自动把发送出去的消息标记为确认, 然后从内存或者磁盘中移除, 而不管消费者有没有收到消息.
采用了这个机制后不用担心消费者接受不到消息的问题因为, 消费者就算是挂了, 消息会一直存在, 消费者服务启动的时候消息就会自动的进行推送, 因为 RabbitMQ 会一直等待持有消息直到消费者显示的调用 Basic.ack 命令为止.
当开启消息确认机制以后, 对于 rabbitmq 服务端而言, 队列中的消息分成了两个部分, 一部分是等待投递给消费者的消息; 一部分是已经投递给消费者的消息. 如果 RabbitMQ 一直没有收到消费者的确认信号, 并且消费者此消息的消费者已经断开连接, 则 RabbitMQ 会安排重新入队列, 等待投递给下一个消费者, 当然也有可能是原来的消费者.
RabbitMQ 除了提供了消息确认机制以外, 在 2.0 以后的版本引入了 Basic.Reject 这个命令, 意思是明确拒绝当前消息而不是确认.
单独的给一个消息接收者设置 ack.
配置文件如下:
- rabbitmq:
- host: 192.168.1.203
- port: 5672
- username: admin
- password: admin
- publisher-confirms: true # 消息发送到交换机确认机制, 是否确认回调
- listener:
- acknowledge-mode :manual # 每条消息必须手工确认
MQ 接收的服务写法:
- @Component
- public class FirstConsumer1 {
- @Autowired
- private FirstSender firstSender;
- @RabbitListener(queues="test_testQueue" )
- public void handleMessage(Message message ,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
- channel.basicAck(tag, false); // 手动确认消息已收到
- String recvStr = new String(message.getBody());
- // 处理消息
- System.out.println("得到消息为"+recvStr);
- // 根据业务逻辑登记判重表, 在业务层面避免消息被二次消费
- }
- }
acknowledgeMode 有三值:
- A,NONE = no acks will be sent (incompatible with channelTransacted=true).
- RabbitMQ calls this "autoack" because the broker assumes all messages are acked without any action from the consumer.
- B,MANUAL = the listener must acknowledge all messages by calling Channel.basicAck().
- C,AUTO = the container will acknowledge the message automatically, unless the MessageListener throws an exception.
Note that acknowledgeMode is complementary to channelTransacted - if the channel is transacted then the broker requires a commit notification in addition to the ack. This is the default mode. See also txSize.
为什么接收的确认这么关键呢? 现在大多数服务都是多台集群 (比如 3 台), 在这种场景下, 如果没有确认的机制, 那么这个队列的消息可能被三台服务器同时取到.
如何处理呢? 接收服务器在接收到消息后立即给 MQ 回复确认, 这样其他服务器就无法获取到这个消息了.[这里其实还是不是太严谨,]
来源: http://www.bubuko.com/infodetail-3258966.html