前言
在使用 rabbitmq 时, 我们可以通过消息持久化来解决服务器因异常崩溃而造成的消息丢失. 除此之外, 我们还会遇到一个问题, 当消息生产者发消息发送出去后, 消息到底有没有正确到达服务器呢? 如果不进行特殊配置, 默认情况下发送的消息是不会给生产者返回任何响应的, 也就是默认情况下生产者并不知道消息是否正常到达了服务器. 对于数据必达的需求, 你肯定对消息的来龙去脉都有个了接, 这种情况下就需要用到 rabbitmq 消息确认.
消息确认
rabbitmq 消息确认分为生产者确认和消费者确认.
生产者消费确认提供了两种机制:
通过事务机制实现
通过 confirm 机制实现
事务机制则用到 channel.txSelect,channel.txCommit,channel.txRollback. 可以参考下面 AMQP 协议流转过程(参考 Rabbitmq 实战指南)
事务机制在一条消息发送之后会阻塞发送端, 以等待 rabbitmq 回应, 之后才继续发送下一条消息. 所以相对来说事务机制的性能要差一些. 事务机制会降低 rabbitmq 的吞吐量, 所以又引入了另一种轻量级的方式: confirm 机制.
生产者通过调用 channel.confirmSelect 将信道设置为 confirm 模式, 之后 Rabbitmq 会返回 Confirm.Select-Ok 命令表示同意生产者将当前信道设置为 confirm 模式. 所有被发送的后续消息都被 ack 或 nack 一次. 类似如下代码:
- channel.confirmSelect()
- channel.basicPublish("exchange","routingkey",null,"test".getBytes())
confirm 机制流转过程参考下图(参考 Rabbitmq 实战指南)
消费者确认
消费者在订阅消息队列时指定 autoAck 参数. 当参数设置为 false 时 rabbitmq 会等待消费者显式回复确认信号才会从内存或者磁盘种删除这条消息. 参数默认为 true. 当 autoAck 设置为 false 时, 对于 rabbitmq 服务器而言, 队列中的消息分成了两部分: 一部分是等待投递给消费者的消息, 一部分是已经投递给消费者的消息但是还没有收到确认信号的消息. 可通过 RabbitMQ web 平台查看队列中 Ready 和 UnAck 对应的数量.
消费者消息确认涉及到 3 个方法: channel.basicAck,channel.basicNack,channel.basicReject
SpringBoot 集成 rabbitmq 下实现消息确认
springboot 集成 rabbitmq 实现消息确认主要涉及两个回调方法(ReturnCallback,ConfirmCallback). 这里消费者部分我用两种方式来实现. 一种是基于 SimpleMessageListenerContainer. 另一种就是用 RabbitListener 注解实现.
- 1,application.YAML
- spring:
- rabbitmq:
- host: 192.168.80.128
- port: 5672
- username: admin
- password: admin
- virtual-host: /
- publisher-confirms: true
- publisher-returns: true
- listener:
- simple:
- acknowledge-mode: manual
- concurrency: 1
- max-concurrency: 10
- retry:
- enabled: true
2, 配置文件(这里实现 ReturnCallback,ConfirmCallback)
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
- import org.springframework.amqp.rabbit.listener.API.ChannelAwareMessageListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.lang.Nullable;
- @Configuration
- public class MqConfig {
- private Logger logger= LoggerFactory.getLogger(MqConfig.class);
- @Autowired
- RabbitTemplate rabbitTemplate;
- @Autowired
- ConnectionFactory connectionFactory;
- @Bean
- public Queue queue(){
- return new Queue("testMq",true); // 持久化队列(默认值也是 true)
- }
- @Bean
- public DirectExchange directExchange(){
- return new DirectExchange("testMq",true,false);
- }
- @Bean
- Binding binding(Queue queue,DirectExchange directExchange){
- return BindingBuilder.bind(queue).to(directExchange).with("testMq");
- }
- /**
- * i->replyCode
- * s->replyText
- * s1->exchange
- * s2->routingKey
- * **/
- // 消息从交换器发送到队列失败时触发
- RabbitTemplate.ReturnCallback msgReturnCallback=new RabbitTemplate.ReturnCallback() {
- @Override
- public void returnedMessage(Message message, int i, String s, String s1, String s2) {
- logger.info("消息:{}, 错误码:{}, 失败原因:{}, 交换器:{}, 路由 key:{}",message.getMessageProperties().getCorrelationId(),i,s,s1,s2);
- }
- };
- // 消息发送到交换器时触发
- RabbitTemplate.ConfirmCallback msgConfirmCallback=new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(@Nullable CorrelationData correlationData, boolean b, @Nullable String s) {
- if(b){
- logger.info("消息 {} 发送 exchange 成功",correlationData.getId());
- }else{
- logger.info("消息发送到 exchange 失败, 原因:{}",s);
- }
- }
- };
- /***
- * 消费者确认(方式二)
- * **/
- @Bean
- public SimpleMessageListenerContainer listenerContainer(){
- SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- container.setQueueNames("testMq");
- container.setExposeListenerChannel(true);
- container.setMaxConcurrentConsumers(10);
- container.setConcurrentConsumers(1);
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- container.setMessageListener(new ChannelAwareMessageListener() {
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try{
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- logger.info("接收消息:{}",new String(message.getBody()));
- }catch (Exception ex){
- //channel.basicReject
- //channel.basicNack
- }
- }
- });
- return container;
- }
- /**
- * 生产者的回调都在这里
- * **/
- @Autowired
- public RabbitTemplate rabbitTemplate(){
- // 消息发送失败后返回到队列中
- rabbitTemplate.setMandatory(true);
- rabbitTemplate.setReturnCallback(msgReturnCallback);
- rabbitTemplate.setConfirmCallback(msgConfirmCallback);
- return rabbitTemplate;
- }
- }
另一种消费端实现方式
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
- @Component
- public class MqConsumer {
- private Logger logger= LoggerFactory.getLogger(MqConsumer.class);
- @RabbitListener(queues = "testMq")
- public void handler(Message message,Channel channel){
- try {
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- logger.info("接收消息:{}",new String(message.getBody()));
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
3, 消息生产者
消息发送时注意生成一个消息 id. 一开始没用到这个参数, 在消息接收时消费者会抛空指针异常
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Controller;
- import org.springframework.Web.bind.annotation.RequestMapping;
- import org.springframework.Web.bind.annotation.ResponseBody;
- import java.util.UUID;
- @Controller
- @RequestMapping("/rabbitMq")
- public class MqController {
- private Logger logger= LoggerFactory.getLogger(MqController.class);
- @Autowired
- RabbitTemplate rabbitTemplate;
- @RequestMapping("/sendMq")
- @ResponseBody
- public String sendMq(){
- /**
- * 这里 exchange,routingkey 都叫 testMq
- * **/
- Object message=null;
- for(int i=0;i<10;i++){
- logger.info("生产者: 第 {} 条消息",i);
- CorrelationData correlationId=new CorrelationData(UUID.randomUUID().toString());
- message="第"+i+"条消息";
- rabbitTemplate.convertAndSend("testMq","testMq",message,correlationId);
- }
- return "sending...";
- }
- }
从运行截图中可以看到生产者和消费者都收到对应的回调消息.
来源: https://www.cnblogs.com/sword-successful/p/10418288.html