一 什么是死信队列
当一条消息在队列中出现以下三种情况的时候, 该消息就会变成一条死信.
消息被拒绝(basic.reject / basic.nack), 并且 requeue = false
消息 TTL 过期
队列达到最大长度
当消息在一个队列中变成一个死信之后, 如果配置了死信队列, 它将被重新 publish 到死信交换机, 死信交换机将死信投递到一个队列上, 这个队列就是死信队列.
二 实现死信队列
2.1 原理图
2.2 创建消费者
创建一个消费者, 绑定消费队列及死信交换机, 交换机默认为 direct 模型, 死信交换机也是, arguments 绑定死信交换机和 key.(注解支持的具体参数文末会附上)
- public class DirectConsumer {
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue(value = "javatrip",arguments =
- {@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
- @Argument(name="x-dead-letter-routing-key",value = "deadKey")
- }),
- exchange = @Exchange(value="javatripDirect"),
- key = {"info","error","warning"}
- )
- })
- public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{
- System.out.println("消费者 1"+message);
- }
2.3 创建生产者
- public void publishMessage(String message){
- rabbitTemplate.setMandatory(true);
- rabbitTemplate.convertAndSend("javatripDirect","info",message);
- }
三 造成死信的三种情况
3.1 拒绝消息, 并且禁止重新入队
设置 YAML 为手动签收模式
- spring:
- rabbitmq:
- listener:
- simple:
- acknowledge-mode: manual
设置拒绝消息并禁止重新入队
- Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
- channel.basicNack(deliverTag,false,false);
绑定死信队列
- @RabbitListener(bindings = {
- @QueueBinding(
- value = @Queue(value = "javatripDead"),
- exchange = @Exchange(value = "deadExchange"),
- key = "deadKey"
- )
- })
- public void receive2(String message){
- System.out.println("我是一条死信:"+message);
- }
3.2 消息 TTL 过期
绑定业务队列的时候, 增加消息的过期时长, 当消息过期后, 消息将被转发到死信队列中.
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue(value = "javatrip",arguments =
- {@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
- @Argument(name="x-dead-letter-routing-key",value = "deadKey"),
- @Argument(name = "x-message-ttl",value = "3000")
- }),
- exchange = @Exchange(value="javatripDirect"),
- key = {"info","error","warning"}
- )
- })
- public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{
- System.out.println("消费者 1"+message);
- }
3.3 队列达到最大长度
设置消息队列长度, 当队列中的消息达到最大长度后, 继续发送消息, 消息将被转发到死信队列中.
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue(value = "javatrip",arguments =
- {@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
- @Argument(name="x-dead-letter-routing-key",value = "deadKey"),
- @Argument(name = "x-max-length",value = "3")
- }),
- exchange = @Exchange(value="javatripDirect"),
- key = {"info","error","warning"}
- )
- })
- public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{
- System.out.println("消费者 1"+message);
- }
四 Spring Boot 整合 RabbitMQ 用到的几个注解
@QueueBinding 作用就是将队列和交换机进行绑定, 主要有以下三个参数:
- @Target({})
- @Retention(RetentionPolicy.RUNTIME)
- public @interface QueueBinding {
- /**
- * @return the queue.
- */
- Queue value();
- /**
- * @return the exchange.
- */
- Exchange exchange();
- /**
- * @return the routing key or pattern for the binding.
- * Multiple elements will result in multiple bindings.
- */
- String[] key() default {};
- }
@Queue 是声明队列及队列的一些属性, 主要参数如下:
- @Target({})
- @Retention(RetentionPolicy.RUNTIME)
- public @interface Queue {
- /**
- * @return the queue name or "" for a generated queue name (default).
- */
- @AliasFor("name")
- String value() default "";
- /**
- * @return the queue name or "" for a generated queue name (default).
- * @since 2.0
- */
- @AliasFor("value")
- String name() default "";
- /**
- * 是否持久化
- */
- String durable() default "";
- /**
- * 是否独享, 排外的.
- */
- String exclusive() default "";
- /**
- * 是否自动删除;
- */
- String autoDelete() default "";
- /**
- * 队列的其他属性参数
- * (1)x-message-ttl: 消息的过期时间, 单位: 毫秒;
- *(2)x-expires: 队列过期时间, 队列在多长时间未被访问将被删除, 单位: 毫秒;
- *(3)x-max-length: 队列最大长度, 超过该最大值, 则将从队列头部开始删除消息;
- *(4)x-max-length-bytes: 队列消息内容占用最大空间, 受限于内存大小, 超过该阈值则从队列头部开始删除消息;
- *(5)x-overflow: 设置队列溢出行为. 这决定了当达到队列的最大长度时消息会发生什么. 有效值是 drop-head,reject-publish 或 reject-publish-dlx. 仲裁队列类型仅支持 drop-head;
- *(6)x-dead-letter-exchange: 死信交换器名称, 过期或被删除 (因队列长度超长或因空间超出阈值) 的消息可指定发送到该交换器中;
- *(7)x-dead-letter-routing-key: 死信消息路由键, 在消息发送到死信交换器时会使用该路由键, 如果不设置, 则使用消息的原来的路由键值
- *(8)x-single-active-consumer: 表示队列是否是单一活动消费者, true 时, 注册的消费组内只有一个消费者消费消息, 其他被忽略, false 时消息循环分发给所有消费者(默认 false)
- *(9)x-max-priority: 队列要支持的最大优先级数; 如果未设置, 队列将不支持消息优先级;
- *(10)x-queue-mode(Lazy mode): 将队列设置为延迟模式, 在磁盘上保留尽可能多的消息, 以减少 RAM 的使用; 如果未设置, 队列将保留内存缓存以尽可能快地传递消息;
- *(11)x-queue-master-locator: 在集群模式下设置镜像队列的主节点信息.
- */
- Argument[] arguments() default {};
- }
@Exchange 是声明交换及交换机的一些属性,
- @Target({})
- @Retention(RetentionPolicy.RUNTIME)
- public @interface Exchange {
- String TRUE = "true";
- String FALSE = "false";
- /**
- * @return the exchange name.
- */
- @AliasFor("name")
- String value() default "";
- /**
- * @return the exchange name.
- * @since 2.0
- */
- @AliasFor("value")
- String name() default "";
- /**
- * 交换机类型, 默认 DIRECT
- */
- String type() default ExchangeTypes.DIRECT;
- /**
- * 是否持久化
- */
- String durable() default TRUE;
- /**
- * 是否自动删除
- */
- String autoDelete() default FALSE;
- /**
- * @return the arguments to apply when declaring this exchange.
- * @since 1.6
- */
- Argument[] arguments() default {};
- }
> 如果觉得文章不错, 欢迎点赞, 留言
> 关注公众号《Java 旅途》, 每日推送精品文章
来源: https://segmentfault.com/a/1190000023532444