RabbitMQ 的工作原理
它的基本结构
组成部分说明如下:
Broker: 消息队列服务进程, 此进程包括两个部分: Exchange 和 Queue.
Exchange: 消息队列交换机, 按一定的规则将消息路由转发到某个队列, 对消息进行过虑.
Queue: 消息队列, 存储消息的队列, 消息到达队列并转发给指定的消费方.
Producer: 消息生产者, 即生产方客户端, 生产方客户端将消息发送到 MQ.
Consumer: 消息消费者, 即消费方客户端, 接收 MQ 转发的消息.
Maven 举例配置
- <dependency>
- <groupId>
- com.rabbitmq
- </groupId>
- <artifactId>
- amqp-client
- </artifactId>
- <version>
- 4.0.3
- </version>
- <!-- 此版本与 spring boot 1.5.9 版本匹配 -->
- </dependency>
- <dependency>
- <groupId>
- org.springframework.boot
- </groupId>
- <artifactId>
- spring-boot-starter-logging
- </artifactId>
- </dependency>
生产者举例 Demo
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class RabbitmqConfig {
- public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
- public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
- public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
- public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
- public static final String ROUTINGKEY_SMS="inform.#.sms.#";
- // 声明交换机
- @Bean(EXCHANGE_TOPICS_INFORM)
- public Exchange EXCHANGE_TOPICS_INFORM(){
- //durable(true) 持久化, mq 重启之后交换机还在
- return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
- }
- // 声明 QUEUE_INFORM_EMAIL 队列
- @Bean(QUEUE_INFORM_EMAIL)
- public Queue QUEUE_INFORM_EMAIL(){
- return new Queue(QUEUE_INFORM_EMAIL);
- }
- // 声明 QUEUE_INFORM_SMS 队列
- @Bean(QUEUE_INFORM_SMS)
- public Queue QUEUE_INFORM_SMS(){
- return new Queue(QUEUE_INFORM_SMS);
- }
- //ROUTINGKEY_EMAIL 队列绑定交换机, 指定 routingKey
- @Bean
- public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
- @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
- }
- //ROUTINGKEY_SMS 队列绑定交换机, 指定 routingKey
- @Bean
- public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
- @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
- }
- }
消费者举例 Demo
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class RabbitmqConfig {
- public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
- public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
- public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
- public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
- public static final String ROUTINGKEY_SMS="inform.#.sms.#";
- // 声明交换机
- @Bean(EXCHANGE_TOPICS_INFORM)
- public Exchange EXCHANGE_TOPICS_INFORM(){
- //durable(true) 持久化, mq 重启之后交换机还在
- return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
- }
- // 声明 QUEUE_INFORM_EMAIL 队列
- @Bean(QUEUE_INFORM_EMAIL)
- public Queue QUEUE_INFORM_EMAIL(){
- return new Queue(QUEUE_INFORM_EMAIL);
- }
- // 声明 QUEUE_INFORM_SMS 队列
- @Bean(QUEUE_INFORM_SMS)
- public Queue QUEUE_INFORM_SMS(){
- return new Queue(QUEUE_INFORM_SMS);
- }
- //ROUTINGKEY_EMAIL 队列绑定交换机, 指定 routingKey
- @Bean
- public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
- @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
- }
- //ROUTINGKEY_SMS 队列绑定交换机, 指定 routingKey
- @Bean
- public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
- @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
- }
- }
工作模式
RabbitMQ 有以下几种工作模式 :
- 1,Work queues
- 2,Publish/Subscribe
- 3,Routing
- 4,Topics
- 5,Header
- 6,RPC
- Work queues
work queues 与入门程序相比, 多了一个消费端, 两个消费端共同消费同一个队列中的消息.
应用场景: 对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度.
测试:
1, 使用入门程序, 启动多个消费者.
2, 生产者发送多个消息.
结果:
1, 一条消息只会被一个消费者接收;
2,rabbit 采用轮询的方式将消息是平均发送给消费者的;
3, 消费者在处理完某条消息后, 才会收到下一条消息.
Publish/subscribe 发布订阅模式
发布订阅模式:
1, 每个消费者监听自己的队列.
2, 生产者将消息发给 broker, 由交换机将消息转发到绑定此交换机的每个队列, 每个绑定交换机的队列都将接收
到消息
Routin
路由模式:
1, 每个消费者监听自己的队列, 并且设置 routingkey.
2, 生产者将消息发给交换机, 由交换机根据 routingkey 来转发消息到指定的队列.
这是一种非常灵活的模式, 经常被用到
Topics
路由模式:
1, 每个消费者监听自己的队列, 并且设置带统配符的 routingkey.
2, 生产者将消息发给 broker, 由交换机根据 routingkey 来转发消息到指定的队列.
Header 模式
header 模式与 routing 不同的地方在于, header 模式取消 routingkey, 使用 header 中的 key/value(键值对)匹配
队列.
案例:
根据用户的通知设置去通知用户, 设置接收 Email 的用户只接收 Email, 设置接收 sms 的用户只接收 sms, 设置两种
通知类型都接收的则两种通知都有效.
生产者 Demo:
- Map<String, Object> headers_email = new Hashtable<String, Object>();
- headers_email.put("inform_type", "email");
- Map<String, Object> headers_sms = new Hashtable<String, Object>();
- headers_sms.put("inform_type", "sms");
- channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
- channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
通知 Demo :
- String message = "email inform to user"+i;
- Map<String,Object> headers = new Hashtable<String, Object>();
- headers.put("inform_type", "email");// 匹配 email 通知消费者绑定的 header
- //headers.put("inform_type", "sms");// 匹配 sms 通知消费者绑定的 header
- AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
- properties.headers(headers);
- //Email 通知
- channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
发送邮件消费者 :
- channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
- Map<String, Object> headers_email = new Hashtable<String, Object>();
- headers_email.put("inform_email", "email");
- // 交换机和队列绑定
- channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
- // 指定消费队列
- channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
- RPC
RPC 即客户端远程调用服务端的方法 , 使用 MQ 可以实现 RPC 的异步调用, 基于 Direct 交换机实现, 流程如下:
1, 客户端即是生产者就是消费者, 向 RPC 请求队列发送 RPC 调用消息, 同时监听 RPC 响应队列.
2, 服务端监听 RPC 请求队列的消息, 收到消息后执行服务端的方法, 得到方法返回的结果
3, 服务端将 RPC 方法 的结果发送到 RPC 响应队列
4, 客户端 (RPC 调用方) 监听 RPC 响应队列, 接收到 RPC 调用结果.
来源: http://www.bubuko.com/infodetail-3041010.html