在说死信队列之前, 我们先介绍下为什么需要用死信队列.
如果想直接了解死信对接, 直接跳入下文的 "死信队列" 部分即可.
ack 机制和 requeue-rejected 属性
我们还是基于上篇《Spring Boot 系列 --7 步集成 RabbitMQ》的 demo 代码来说.
在项目我们看到 application.YAML 文件部分配置内容如下
- ...
- listener:
- type: simple
- simple:
- acknowledge-mode: auto
- concurrency: 5
- default-requeue-rejected: true
- max-concurrency: 100
- ...
其中
acknowledge-mode
该配置项是用来表示消息确认方式, 其有三种配置方式, 分别是 none,manual 和 auto.
none 意味着没有任何的应答会被发送.
manual 意味着监听者必须通过调用 Channel.basicAck() 来告知所有的消息.
auto 意味着容器会自动应答, 除非 MessageListener 抛出异常, 这是默认配置方式.
default-requeue-rejected
该配置项是决定由于监听器抛出异常而拒绝的消息是否被重新放回队列. 默认值为 true.
我一开始对于这个属性有个误解, 我以为 rejected 是表示拒绝, 所以将 requeue-rejected 连起来是拒绝重新放回队列, 后来查了资料明白这个属性的功能才想起来 rejected 是个形容词, 其表示的应该是被拒绝的消息
所以如果该属性配置为 true 表示会重新放回队列, 如果配置为 false 表示不会放回队列.
下面我们看看 acknowledge-mode 参数和 default-requeue-rejected 参数使用不同的组合方式, RabbitMQ 是如何处理消息的.
代码依然使用 springboot-demo 中的 RabbitApplicationTests 发送消息, 使用 Receiver 类监听 demo-queue 队列的消息.
对于 Receiver 类添加了一行代码, 该代码模拟抛出异常
- @Component
- public class Receiver {
- @RabbitListener(queues = "demo_queue")
- public void created(String message) {
- System.out.println("orignal message:" + message);
- int i = 1/0;
- }
- }
- acknowledge-mode=none, default-requeue-rejected=false
该配置不会确认消息是否正常消费, 所以在控制台没有抛出任何异常. 通过在 RabbitMQ 管理页面也没有看到重新放回队列的消息
acknowledge-mode=none, default-requeue-rejected=true
同样该配置不会确认消息是否正常消费, 所以在控制台没有抛出任何异常. 而且即使 default-requeue-rejected 配置为 true 因为没有确认所以也没有看到重新放回队列的消息
acknowledge-mode=manual, default-requeue-rejected=false
该配置需要手动确认消息是否正常消费, 但是代码中并没有手动确认, 个人理解是因为没有收到 ack, 所以消息又回到了队列中.
acknowledge-mode=manual, default-requeue-rejected=true
该配置需要手动确认消息是否正常消费, 但是代码中并没有手动确认, 所以消息被重新放入到队列中了, 并且在控制台发现还抛出了异常 (这块不是很清楚, default-requeue-rejected 设置 true 和 false 带来的不同效果, 有了解的麻烦下方留言指教).
acknowledge-mode=auto, default-requeue-rejected=false
该配置采用自动确认, 从结果来看, 是自动确认了.
从控制台打印的结果可以看出 Receiver 方法执行了 3 次, 分别是前面两条放回队列的消息以及这次发送的消息, 所以 3 条消息都消费了.
同时因为 default-requeue-rejected 设置为 false, 所以即使消费抛出异常, 也没有将消息放回队列.
acknowledge-mode=auto, default-requeue-rejected=true
该配置同样采用自动确认, 从结果看出, 没有抛出异常 (这块也不是很理解), 且因为 default-requeue-rejected 设置为 true, 所以消息重新回到队列.
综上罗列这么多情况只为说明有些情况下, 如果消息消费出错, 因为配置问题导致消息丢失了. 这在很多情况下是要命的, 比如用户支付的订单号, 如果因为抛异常等原因直接丢失是很要命的.
所以, 我们需要有一个确保机制, 能够保证即使失败的消息也能保存下来, 这时候死信队列就排上用场了.
死信队列
死信队列的整个设计思路是这样的
生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX 交换机 --> 队列 --> 消费者
下面我们通过网上的一个简单的死信队列的实现看看如何使用死信队列.
- @Bean("deadLetterExchange")
- public Exchange deadLetterExchange() {
- return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
- }
- @Bean("deadLetterQueue")
- public Queue deadLetterQueue() {
- Map<String, Object> args = new HashMap<>(2);
- // x-dead-letter-exchange 声明 死信交换机
- args.put("x-dead-letter-exchange", "DL_EXCHANGE");
- // x-dead-letter-routing-key 声明 死信路由键
- args.put("x-dead-letter-routing-key", "KEY_R");
- return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
- }
- @Bean("redirectQueue")
- public Queue redirectQueue() {
- return QueueBuilder.durable("REDIRECT_QUEUE").build();
- }
- /**
- * 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
- *
- * @return the binding
- */
- @Bean
- public Binding deadLetterBinding() {
- return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);
- }
- /**
- * 死信路由通过 KEY_R 绑定键绑定到死信队列上.
- *
- * @return the binding
- */
- @Bean
- public Binding redirectBinding() {
- return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
- }
注意
声明了一个 direct 模式的 exchange.
声明了一个死信队列 deadLetterQueue, 该队列配置了一些属性 x-dead-letter-exchange 表明死信交换机, x-dead-letter-routing-key 表明死信路由键, 因为是 direct 模式, 所以需要设置这个路由键.
声明了一个替补队列 redirectQueue, 变成死信的消息最终就是存放在这个队列的.
声明绑定关系, 分别是死信队列以及替补队列和交换机的绑定.
那么如何模拟生成一个死信消息呢, 可以在发送到 DL_QUEUE 的消息在 10 秒后失效, 然后转发到替补队列中, 代码实现如下
- public void sendMsg(String content) {
- CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
- MessagePostProcessor messagePostProcessor = message -> {
- MessageProperties messageProperties = message.getMessageProperties();
- // 设置编码
- messageProperties.setContentEncoding("utf-8");
- // 设置过期时间 10*1000 毫秒
- messageProperties.setExpiration("5000");
- return message;
- };
- rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", content, messagePostProcessor);
- }
执行结果如下
消息首先进入 DL_QUEUE,5 秒后失效, 被转发到 REDIRECT_QUEUE 中.
来源: https://www.cnblogs.com/bigdataZJ/p/springboot-deadletter-queue.html