这篇是 SpringBoot 整合消息队列的第一篇文章, 我们详细介绍下消息队列的相关内容.
消息队列简介
1. 什么是消息队列
MQ(Message Quene): 通过典型的生产者和消费者模型, 生产者不断向消息队列中产生消息, 消费者不断的从队列中获取消息. 因为生产者和消费者都是异步的, 而且生产者只关心消息的发送, 消费者只关心消息的接收, 没有业务逻辑的侵入, 轻松实现业务解耦.
2. 消息队列有什么用
异步处理
场景描述: 某商场具有注册功能, 注册的时候需要发送短信验证码.
传统的做法是用户提交信息到用户服务, 用户服务调用短信服务发送短信, 然后给用户返回响应, 这种是同步的处理方式, 耗时较长. 加入消息队列后, 用户直接提交信息到用户服务, 将信息写入消息队列, 直接给用户返回响应, 短信服务从消息队列中读取消息进行发送短信.
应用解耦
场景描述: 某商场下单流程.
传统做法是用户下单, 订单系统去查询库存系统, 如果库存系统宕机了, 则下单失败, 损失订单量. 加入消息队列后, 用户下单, 订单系统记录订单, 将订单信息写入消息队列, 下单成功, 然后库存系统恢复正常后去操作数据库库存(不考虑库存为 0 的情况). 这样订单系统和库存系统就达到松耦合的目的了
流量削峰
场景描述: 秒杀活动.
流量过大肯定会导致响应超时或系统宕机, 加入消息队列, 用户秒杀请求写入消息队列, 设置消息队列的长度等属性, 达到消息队列最大长度后, 直接返回秒杀失败, 然后再去消费消息队列的数据, 完成秒杀.
RabbitMQ 简介
RabbitMQ 是用 Erlang 语言编写的, 实现了高级消息队列协议 (AMQP) 的消息中间件.
1. AMQP 协议概念
AMQP:AMQP 是一种链接协议, 直接定义网络交换的数据格式, 这使得实现了 AMQP 的 provider 本身就是跨平台的. 以下是 AMQP 协议模型:
server - 又称 broker, 接收客户端的链接, 实现 amqp 实体服务.
Connection - 链接, 应用程序跟 broker 的网络链接.
channel - 网络信道, 几乎所有的操作都是在 channel 中进行, 数据的流转都要在 channel 上进行. channel 是进行消息读写的通道. 客户端可以建立多个 channel, 每个 channel 代表一个会话任务.
message - 消息, 服务器与应用程序之间传送的数据. 由 properties 和 body 组成. properties 可以对消息进行修饰, 比如消息的升级, 延迟等高级特性. body 就是消息体的内容.
virtual host - 虚拟主机, 用于进行逻辑隔离, 最上层的消息路由, 一个虚拟地址里面可以有多个交换机. exchange 和消息队列 message quene.
exchange - 交换机, 接收消息, 根据路由器转发消息到绑定的队列.
binding - 绑定, 交换机和队列之间的虚拟链接, 绑定中可以包含 routing key.
routing key - 一个路由规则, 虚拟机可以用它来确定 jiekyi 如何路由一个特定消息.
quene - 消息队列, 保存消息并将它们转发给消费者.
2. RabbitMQ 的消息模型
1. 简单模型
在上图中:
p: 生成者
C: 消费者
红色部分: quene, 消息队列
2. 工作模型
在上图中:
p: 生成者
C1,C2: 消费者
红色部分: quene, 消息队列
当消息处理比较耗时时, 就会出现生产消息的速度远远大于消费消息的速度, 这样就会出现消息堆积, 无法及时处理. 这时就可以让多个消费者绑定一个队列, 去消费消息, 队列中的消息一旦消费就会丢失, 因此任务不会重复执行.
3. 广播模型(fanout)
这种模型中生产者发送的消息所有消费者都可以消费.
在上图中:
p: 生成者
X: 交换机
C1,C2: 消费者
红色部分: quene, 消息队列
4. 路由模型(routing)
这种模型消费者发送的消息, 不同类型的消息可以由不同的消费者去消费.
在上图中:
p: 生成者
X: 交换机, 接收到生产者的消息后将消息投递给与 routing key 完全匹配的队列
C1,C2: 消费者
红色部分: quene, 消息队列
5. 订阅模型(topic)
这种模型和 direct 模型一样, 都是可以根据 routing key 将消息路由到不同的队列, 只不过这种模型可以让队列绑定 routing key 的时候使用通配符. 这种类型的 routing key 都是由一个或多个单词组成, 多个单词之间用. 分割.
通配符介绍:
*: 只匹配一个单词
#: 匹配一个或多个单词
6. RPC 模型
这种模式需要通知远程计算机运行功能并等待返回运行结果. 这个过程是阻塞的.
当客户端启动时, 它创建一个匿名独占回调队列. 并提供名字为 call 的函数, 这个 call 会发送 RPC 请求并且阻塞直到收到 RPC 运算的结果.
Spring Boot 整合 RabbitMQ
第一步: 引入 pom 依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
第二步: 增加 RabbitMQ 服务配置信息
- spring:
- rabbitmq:
- virtual-host: javatrip
- port: 5672
- host: 127.0.0.1
- username: guest
- password: guest
这里我们用广播模型来举例使用, 广播模型 (fanout) 比较好理解, 就像公众号一样, 我每天推文章后, 会推送给每个关注用户, 他们都可以看到这条消息.
广播模型注意点:
可以有多个队列
每个队列都需要绑定交换机
每个消费者有自己的队列
交换机把消息发送给绑定过的所有队列
1. 定义两个队列
- @Configuration
- public class RabbitConfig {
- final static String queueNameA = "first-queue";
- final static String queueNameB = "second-queue";
- /***
- * 定义一个队列, 设置队列属性
- * @return
- */
- @Bean("queueA")
- public Queue queueA(){
- Map<String,Object> map = new HashMap<>();
- // 消息过期时长, 10 秒过期
- map.put("x-message-ttl",10000);
- // 队列中最大消息条数, 10 条
- map.put("x-max-length",10);
- // 第一个参数, 队列名称
- // 第二个参数, durable: 持久化
- // 第三个参数, exclusive: 排外的,
- // 第四个参数, autoDelete: 自动删除
- Queue queue = new Queue(queueNameA,true,false,false,map);
- return queue;
- }
- @Bean("queueB")
- public Queue queueB(){
- Map<String,Object> map = new HashMap<>();
- // 消息过期时长, 10 秒过期
- map.put("x-message-ttl",10000);
- // 队列中最大消息条数, 10 条
- map.put("x-max-length",10);
- // 第一个参数, 队列名称
- // 第二个参数, durable: 持久化
- // 第三个参数, exclusive: 排外的,
- // 第四个参数, autoDelete: 自动删除
- Queue queue = new Queue(queueNameB,true,false,false,map);
- return queue;
- }
- }
2. 定义扇形交换机
- @Bean
- public FanoutExchange fanoutExchange(){
- // 第一个参数, 交换机名称
- // 第二个参数, durable, 是否持久化
- // 第三个参数, autoDelete, 是否自动删除
- FanoutExchange fanoutExchange = new FanoutExchange(exchangeName,true,false);
- return fanoutExchange;
- }
3. 交换机和队列绑定
- @Bean
- public Binding bindingA(@Qualifier("queueA") Queue queueA, FanoutExchange fanoutExchange){
- Binding binding = BindingBuilder.bind(queueA).to(fanoutExchange);
- return binding;
- }
- @Bean
- public Binding bindingB(@Qualifier("queueB") Queue queueB,FanoutExchange fanoutExchange){
- Binding binding = BindingBuilder.bind(queueB).to(fanoutExchange);
- return binding;
- }
4. 创建两个消费者分别监听两个队列
- @RabbitListener(queues = RabbitConfig.queueNameA)
- @Component
- @Slf4j
- public class ConsumerA {
- @RabbitHandler
- public void receive(String message){
- log.info("消费者 A 接收到的消息:"+message);
- }
- }
- @RabbitListener(queues = RabbitConfig.queueNameB)
- @Component
- @Slf4j
- public class ConsumerB {
- @RabbitHandler
- public void receive(String message){
- log.info("消费者 B 接收到的消息:"+message);
- }
- }
5. 创建生产者生产消息
- @RestController
- public class provider {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @GetMapping("send")
- public void sendMessage(){
- String message = "你好, 我是 Java 旅途";
- rabbitTemplate.convertAndSend(RabbitConfig.exchangeName,null,message);
- }
- }
这样生产者发送一条消息后, 两个消费者就能同时消费到消息了.
此是 spring-boot-route 系列的第十三篇文章, 这个系列的文章都比较简单, 主要目的就是为了帮助初次接触 Spring Boot 的同学有一个系统的认识. 本文已收录至我的 https://github.com/binzh303/spring-boot-route , 欢迎各位小伙伴 star!
GitHub: https://github.com/binzh303/spring-boot-route
点关注, 不迷路
来源: https://segmentfault.com/a/1190000037438017