前言
消息队列在现今数据量超大, 并发量超高的系统中是十分常用的. 本文将会对现时最常用到的几款消息队列框架 ActiveMQ,RabbitMQ,Kafka 进行分析对比.
详细介绍 RabbitMQ 在 Spring 框架下的结构及实现原理, 从 Producer 端的事务, 回调函数 (ConfirmCallback / ReturnCallback) 到 Consumer 端的 MessageListenerContainer 信息接收容器进行详细的分析. 通过对 RabbitTemplate,SimpleMessageListenerContainer,DirectMessageListenerContainer 等常用类型介绍, 深入剖析在消息处理各个传输环节中的原理及注意事项.
并举以实例对死信队列, 持久化操作进行一一介绍.
目录
一, RabbitMQ 与 AMQP 的关系
二, RabbitMQ 的实现原理
三, RabbitMQ 应用实例
四, Producer 端的消息发送与监控
五, Consumer 端的消息接收与监控
六, 死信队列
七, 持久化操作
六, 死信队列
死信队列(Dead-Letter-Exchange) 可被看作是死信交换器. 当消息在一个队列中变成死信后, 它能被重新被发送到特定的交换器中, 这个交换器就是 DLX , 绑定 DLX 的队列就称之为死信队列. 消息变成死信一般是由于以下几种情况:
. 消息被拒绝, requeue 被设置为 false, 可通过上一介绍的 void basicReject (deliveryTag, requeue) 或 void basicNack(deliveryTag,multiple, requeue) 完成设置 ;
. 消息过期;
. 队列超出最大长度.
其实死信队列 DLX 也是一个正常的交换器, 和一般的交换器没有什么区别, 我们可以用一般建立队列的方法, 建立一个死信队列. 然后建立一个正常的队列, 在正常队列中加入参数 x-dead-letter-exchange,x-dead-letter-routing-key 与死信队列进行绑定, 完成绑定后在管理界面 Features 选项中 direct.queue.first 会显示 DLX DLK. 这时当被绑定的队列出现超时, 超长, 或被拒绝时(注意 requeue 被设置为 false 时, 对会激发死信), 信息就会流入死信队列被处理.
具体的例子 Producer 端:
- @Configuration
- public class BindingConfig {
- public final static String Queue_First="direct.queue.first";
- public final static String Exchange_Name="directExchange";
- public final static String Routing_Key_First="directKey1";
- @Bean
- public Queue queueFirst(){
- return new Queue(this.Queue_First);
- }
- @Bean
- public DirectExchange directExchange(){
- return new DirectExchange(this.Exchange_Name);
- }
- @Bean
- public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
- return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
- }
- }
- @Configuration
- public class ConnectionConfig {
- @Value("${spring.rabbitmq.host}")
- public String host;
- @Value("${spring.rabbitmq.port}")
- public int port;
- @Value("${spring.rabbitmq.username}")
- public String username;
- @Value("${spring.rabbitmq.password}")
- public String password;
- @Value("${spring.rabbitmq.virtual-host}")
- public String virtualHost;
- @Bean
- public ConnectionFactory getConnectionFactory(){
- CachingConnectionFactory factory=new CachingConnectionFactory();
- System.out.println(host);
- factory.setHost(host);
- factory.setPort(port);
- factory.setUsername(username);
- factory.setPassword(password);
- factory.setVirtualHost(virtualHost);
- return factory;
- }
- }
- @Controller
- @RequestMapping("/producer")
- public class ProducerController {
- @Autowired
- private RabbitTemplate template;
- @RequestMapping("/send")
- public void send() {
- for(int n=0;n<10;n++){
- template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"Hello World!"
- +String.valueOf(n),getCorrelationData());
- }
- }
- private CorrelationData getCorrelationData(){
- return new CorrelationData(UUID.randomUUID().toString());
- }
- }
Customer 端
- @Configuration
- public class BindingConfig {
- // 普通队列参数
- public final static String Queue_First="direct.queue.first";
- public final static String Exchange_Name="directExchange";
- public final static String Routing_Key_First="directKey1";
- // 死信队列参数
- public final static String Queue_Dead="direct.queue.dead";
- public final static String Exchange_Dead="directDead";
- public final static String Routing_Key_Dead="directDeadKey";
- @Bean
- public Queue queueFirst(){
- Map<String, Object> args=new HashMap<String,Object>();
- // 声明当前死信的 Exchange
- args.put("x-dead-letter-exchange", this.Exchange_Dead);
- // 声明当前队列的死信路由 key
- args.put("x-dead-letter-routing-key", this.Routing_Key_Dead);
- // 把死信队列的参数绑定到当前队列中
- return QueueBuilder.durable(Queue_First).withArguments(args).build();
- }
- @Bean
- public DirectExchange directExchange(){
- return new DirectExchange(this.Exchange_Name);
- }
- @Bean
- public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
- return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
- }
- @Bean
- public Queue queueDead(){
- return new Queue(this.Queue_Dead);
- }
- @Bean
- public DirectExchange directExchangeDead(){
- return new DirectExchange(this.Exchange_Dead);
- }
- @Bean
- public Binding bindingExchangeDead(Queue queueDead,DirectExchange directExchangeDead){
- return BindingBuilder.bind(queueDead).to(directExchangeDead).with(this.Routing_Key_Dead);
- }
- }
- @Configuration
- public class ConnectionConfig {
- @Value("${spring.rabbitmq.host}")
- public String host;
- @Value("${spring.rabbitmq.port}")
- public int port;
- @Value("${spring.rabbitmq.username}")
- public String username;
- @Value("${spring.rabbitmq.password}")
- public String password;
- @Value("${spring.rabbitmq.virtual-host}")
- public String virtualHost;
- @Bean
- public ConnectionFactory getConnectionFactory(){
- CachingConnectionFactory factory=new CachingConnectionFactory();
- factory.setHost(host);
- factory.setPort(port);
- factory.setUsername(username);
- factory.setPassword(password);
- factory.setVirtualHost(virtualHost);
- return factory;
- }
- }
- @Configuration
- public class DirectMessListener {
- @Autowired
- private ConnectionConfig connectionConfig;
- @Autowired
- private RabbitTemplate template;
- private int index=0,normalIndex=0,deadIndex=0;
- @Bean
- public DirectMessageListenerContainer messageContainer(){
- DirectMessageListenerContainer container=new DirectMessageListenerContainer();
- container.setConnectionFactory(connectionConfig.getConnectionFactory());
- // 设置每个队列的 consumer 数量
- container.setConsumersPerQueue(4);
- // 设置每个 consumer 每次的接收的消息数量
- container.setPrefetchCount(10);
- // 使用 MANUAL 手动确认
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- // 监听队列
- container.addQueueNames(BindingConfig.Queue_First);
- container.addQueueNames(BindingConfig.Queue_Dead);
- container.setConsumerTagStrategy(queue -> "consumer"+(++index));
- container.setMessageListener(new ChannelAwareMessageListener(){
- @Override
- public void onMessage(Message message, com.rabbitmq.client.Channel channel)
- throws Exception {
- MessageProperties prop=message.getMessageProperties();
- if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_First)){
- System.out.println("This is a normal queue!"+(++normalIndex));
- // 把当前的队列转送到死信队列中
- channel.basicReject(prop.getDeliveryTag(), false);
- }
- if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_Dead)){
- System.out.println("This is a dead queue!"+(++deadIndex));
- // 模拟对死信队列处理
- Thread.currentThread().sleep(5000);
- .......
- // 处理完毕
- channel.basicAck(prop.getDeliveryTag(), false);
- }
- }
- });
- return container;
- }
- }
通过管理界面可以看, 信息会先发送到 direct.queue.first, 然后被放进死信队列作处理.
运行结果
添加描述
死信队列最常用的场景可以在订单支付, 流程审批等环节. 例如在 京 *, 淘 * 等平台, 当下单成功后, 客户要在一定的时间内完成支付操作, 否则订单被视作无效, 这些业务流程就可以使用死信队列来处理.
七, 持久化操作
RabbitMq 的持久化操作包含有 Queue 持久化, Message 持久化和 Exchange 持久化三类.
7.1 Queue 的持久化
队列持久化只需要在 Queue 的构造函数 public Queue(String name, boolean durable) 把 durable 参数置为 true 就可实现. 如果队列不设置持久化( (durable 默认为 false), 那么在 RabbitMQ 服务重启之后, 相关队列的元数据会丢失, 此时数据也会丢失.
7.2 Message 持久化
设置了 Queue 持久化以后, 当 RabbitMQ 服务重启之后, 队列依然存在, 但消息已经消失, 可见单单设置队列的持久化而不设置消息持久化显得毫无意义, 所以通常列队持久化会与消息持久化共同使用.
在 RabbitMQ 原生态的框架下, 需要把信息属性设置为 MessageProperties.PERSISTENT TEXT PLAIN 才会实现消息的持久化.
而在 Spring 框架下, 由于在使用回调函数时需要把 Message 重新返回队列再进行处理, 所以 Message 默认已经是持久化的.
7.3 Exchage 的持久化
交换器持久化可通过构造函数 public DirectExchange(String name, boolean durable, boolean autoDelete) 把 durable 参数置为 true 就可实现, 而 autoDelete 则是指在所在消费者都解除订阅的情况下自动删除. 如果交换器不设置持久化, 那么在 RabbitMQ 服务重启之后, 相关的交换器元数据会丢失, 不过消息不会丢失, 只是消息不再发送到该 Exchange . 对一个长期使用的交换器来说, 持久化还是有其必要性的.
本章总结
RabbitMQ 发展至今, 被越来越多的人认可, 这和它在易用性, 扩展性, 可靠性和高可用性等方面的卓著表现是密不可分的.
相比于传统的 ActiveMQ 和分布式 Kafka, 它具有自己独有的特点.
希望文章有帮于大家对 RabbitMQ 消息队列方面有更深入的了解, 在不同的开发环境中灵活运用.
由于时间仓促, 文章当中有不明确的地方或有错漏敬请点明.
来源: https://www.qcloud.com/developer/article/1485646