综合概述
消息队列
消息队列就是一个消息的链表, 可以把消息看作一个记录, 具有特定的格式以及特定的优先级. 对消息队列有写权限的进程可以向消息队列中按照一定的规则添加新消息, 对消息队列有读权限的进程则可以从消息队列中读走消息, 而消息队列就是在消息的传输过程中保存消息的容器, 你可以简单的把消息队列理解为类似快递柜, 快递员 (消息发布者) 往快递柜 (消息队列) 投递物件 (消息), 接受者(消息订阅者) 从快递柜 (消息队列) 接收物件(消息), 当然消息队列往往还包含一些特定的消息传递和接收机制.
消息队列作为分布式系统中重要的组件, 可以有效解决应用耦合, 异步消息, 流量削锋等系列问题, 有利于实现高性能, 高可用, 可伸缩和最终一致性架构. 目前使用较多的消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 等, 各种消息队列也都各有特点, 比如 Kafka 提供高性能, 高吞吐量, 但可靠性有所欠缺, 所以比较适合像日志处理这类对性能要求高但对可靠性要求没那么严格的业务, 再比如 RabbitMQ 支持了各种协议, 实现较为臃肿, 性能和吞吐量都一般, 但却提供了很好的可靠性, 比较适合像银行金融一类对可靠性要求较高的业务.
应用场景
以下简单介绍几个消息队列在实际应用中的使用场景(以下场景资料引用自网络).
1 异步处理
场景说明: 用户注册后, 需要发注册邮件和注册短信. 传统的做法有两种 1. 串行的方式; 2. 并行方式
(1)串行方式: 将注册信息写入数据库成功后, 发送注册邮件, 再发送注册短信. 以上三个任务全部完成后, 返回给客户端
(2)并行方式: 将注册信息写入数据库成功后, 发送注册邮件的同时, 发送注册短信. 以上三个任务完成后, 返回给客户端. 与串行的差别是, 并行的方式可以提高处理的时间
假设三个业务节点每个使用 50 毫秒钟, 不考虑网络等其他开销, 则串行方式的时间是 150 毫秒, 并行的时间可能是 100 毫秒.
因为 CPU 在单位时间内处理的请求数是一定的, 假设 CPU1 秒内吞吐量是 100 次. 则串行方式 1 秒内 CPU 可处理的请求量是 7 次(1000/150). 并行方式处理的请求量是 10 次(1000/100)
小结: 如以上案例描述, 传统的方式系统的性能 (并发量, 吞吐量, 响应时间) 会有瓶颈. 如何解决这个问题呢?
引入消息队列, 将不是必须的业务逻辑, 异步处理. 改造后的架构如下:
按照以上约定, 用户的响应时间相当于是注册信息写入数据库的时间, 也就是 50 毫秒. 注册邮件, 发送短信写入消息队列后, 直接返回, 因此写入消息队列的速度很快, 基本可以忽略, 因此用户的响应时间可能是 50 毫秒. 因此架构改变后, 系统的吞吐量提高到每秒 20 QPS. 比串行提高了 3 倍, 比并行提高了两倍
2 应用解耦
场景说明: 用户下单后, 订单系统需要通知库存系统. 传统的做法是, 订单系统调用库存系统的接口. 如下图
传统模式的缺点:
假如库存系统无法访问, 则订单减库存将失败, 从而导致订单失败
订单系统与库存系统耦合
如何解决以上问题呢? 引入应用消息队列后的方案, 如下图:
订单系统: 用户下单后, 订单系统完成持久化处理, 将消息写入消息队列, 返回用户订单下单成功
库存系统: 订阅下单的消息, 采用拉 / 推的方式, 获取下单信息, 库存系统根据下单信息, 进行库存操作
假如: 在下单时库存系统不能正常使用. 也不影响正常下单, 因为下单后, 订单系统写入消息队列就不再关心其他的后续操作了. 实现订单系统与库存系统的应用解耦
3 流量削锋
流量削锋也是消息队列中的常用场景, 一般在秒杀或团抢活动中使用广泛
应用场景: 秒杀活动, 一般会因为流量过大, 导致流量暴增, 应用挂掉. 为解决这个问题, 一般需要在应用前端加入消息队列.
可以控制活动的人数
可以缓解短时间内高流量压垮应用
用户的请求, 服务器接收后, 首先写入消息队列. 假如消息队列长度超过最大数量, 则直接抛弃用户请求或跳转到错误页面
秒杀业务根据消息队列中的请求信息, 再做后续处理
4 日志处理
日志处理是指将消息队列用在日志处理中, 比如 Kafka 的应用, 解决大量日志传输的问题. 架构简化如下
日志采集客户端, 负责日志数据采集, 定时写受写入 Kafka 队列
Kafka 消息队列, 负责日志数据的接收, 存储和转发
日志处理应用: 订阅并消费 kafka 队列中的日志数据
以下是新浪 kafka 日志处理应用案例:
(1)Kafka: 接收用户日志的消息队列
(2)Logstash: 做日志解析, 统一成 JSON 输出给 Elasticsearch
(3)Elasticsearch: 实时日志分析服务的核心技术, 一个 schemaless, 实时的数据存储服务, 通过 index 组织数据, 兼具强大的搜索和统计功能
(4)Kibana: 基于 Elasticsearch 的数据可视化组件, 超强的数据可视化能力是众多公司选择 ELK stack 的重要原因
5 消息通讯
消息通讯是指, 消息队列一般都内置了高效的通信机制, 因此也可以用在纯的消息通讯. 比如实现点对点消息队列, 或者聊天室等
点对点通讯:
客户端 A 和客户端 B 使用同一队列, 进行消息通讯.
聊天室通讯:
客户端 A, 客户端 B, 客户端 N 订阅同一主题, 进行消息发布和接收. 实现类似聊天室效果.
以上实际是消息队列的两种消息模式, 点对点或发布订阅模式. 模型为示意图, 供参考.
Rabbit MQ
AMQP, 即 Advanced Message Queuing Protocol, 高级消息队列协议, 是应用层协议的一个开放标准, 为面向消息的中间件设计. 消息中间件主要用于组件之间的解耦和通讯. AMQP 的主要特征是面向消息, 队列, 路由(包括点对点和发布 / 订阅), 可靠性和安全.
RabbitMQ 是一个开源的 AMQP 实现, 服务器端用 Erlang 语言编写, 支持多种客户端, 如: Java,Python,Ruby,.NET,JMS,C,PHP,ActionScript,XMPP,STOMP 等, 支持 Ajax. 用于在分布式系统中存储转发消息, 具有很高的易用性和可用性.
接下来, 我们先来了解几个相关概念(以下相关介绍资料引用自网络).
ConnectionFactory,Connection,Channel
ConnectionFactory,Connection,Channel 都是 RabbitMQ 对外提供的 API 中最基本的对象. Connection 是 RabbitMQ 的 socket 链接, 它封装了 socket 协议相关部分逻辑. ConnectionFactory 为 Connection 的制造工厂. Channel 是我们与 RabbitMQ 打交道的最重要的一个接口, 我们大部分的业务操作是在 Channel 这个接口中完成的, 包括定义 Queue, 定义 Exchange, 绑定 Queue 与 Exchange, 发布消息等.
Queue
Queue(队列)是 RabbitMQ 的内部对象, 用于存储消息.
RabbitMQ 中的消息都只能存储在 Queue 中, 生产者 (下图中的 P) 生产消息并最终投递到 Queue 中, 消费者 (下图中的 C) 可以从 Queue 中获取消息并消费.
生产者 Send Message "A" 被传送到 Queue 中, 消费者发现消息队列 Queue 中有订阅的消息, 就会将这条消息 A 读取出来进行一些列的业务操作. 这里只是一个消费正对应一个队列 Queue, 也可以多个消费者订阅同一个队列 Queue, 当然这里就会将 Queue 里面的消息平分给其他的消费者, 但是会存在一个一个问题就是如果每个消息的处理时间不同, 就会导致某些消费者一直在忙碌中, 而有的消费者处理完了消息后一直处于空闲状态, 因为前面已经提及到了 Queue 会平分这些消息给相应的消费者. 这里我们就可以使用 prefetchCount 来限制每次发送给消费者消息的个数. 详情见下图所示:
这里的 prefetchCount=1 是指每次从 Queue 中发送一条消息来. 等消费者处理完这条消息后 Queue 会再发送一条消息给消费者.
Message acknowledgment
在实际应用中, 可能会发生消费者收到 Queue 中的消息, 但没有处理完成就宕机 (或出现其他意外) 的情况, 这种情况下就可能会导致消息丢失. 为了避免这种情况发生, 我们可以要求消费者在消费完消息后发送一个回执给 RabbitMQ,RabbitMQ 收到消息回执 (Message acknowledgment) 后才将该消息从 Queue 中移除; 如果 RabbitMQ 没有收到回执并检测到消费者的 RabbitMQ 连接断开, 则 RabbitMQ 会将该消息发送给其他消费者 (如果存在多个消费者) 进行处理. 这里不存在 timeout 概念, 一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者, 除非它的 RabbitMQ 连接断开. 这里会产生另外一个问题, 如果我们的开发人员在处理完业务逻辑后, 忘记发送回执给 RabbitMQ, 这将会导致严重的 bug--Queue 中堆积的消息会越来越多; 消费者重启后会重复消费这些消息并重复执行业务逻辑...
另外 pub message 是没有 ack 的.
Message durability
如果我们希望即使在 RabbitMQ 服务重启的情况下, 也不会丢失消息, 我们可以将 Queue 与 Message 都设置为可持久化的(durable), 这样可以保证绝大部分情况下我们的 RabbitMQ 消息不会丢失. 但依然解决不了小概率丢失事件的发生(比如 RabbitMQ 服务器已经接收到生产者的消息, 但还没来得及持久化该消息时 RabbitMQ 服务器就断电了), 如果我们需要对这种小概率事件也要管理起来, 那么我们要用到事务. 由于这里仅为 RabbitMQ 的简单介绍, 所以这里将不讲解 RabbitMQ 相关的事务.
Exchange
首先明确一点就是生产者产生的消息并不是直接发送给消息队列 Queue 的, 而是要经过 Exchange(交换器), 由 Exchange 再将消息路由到一个或多个 Queue, 当然这里还会对不符合路由规则的消息进行丢弃掉, 这里指的是后续要谈到的 Exchange Type. 那么 Exchange 是怎样将消息准确的推送到对应的 Queue 的呢? 那么这里的功劳最大的当属 Binding,RabbitMQ 是通过 Binding 将 Exchange 和 Queue 链接在一起, 这样 Exchange 就知道如何将消息准确的推送到 Queue 中去. 简单示意图如下所示:
在绑定(Binding)Exchange 和 Queue 的同时, 一般会指定一个 Binding Key, 生产者将消息发送给 Exchange 的时候, 一般会产生一个 Routing Key, 当 Routing Key 和 Binding Key 对应上的时候, 消息就会发送到对应的 Queue 中去. 那么 Exchange 有四种类型, 不同的类型有着不同的策略. 也就是表明不同的类型将决定绑定的 Queue 不同, 换言之就是说生产者发送了一个消息, Routing Key 的规则是 A, 那么生产者会将 Routing Key=A 的消息推送到 Exchange 中, 这时候 Exchange 中会有自己的规则, 对应的规则去筛选生产者发来的消息, 如果能够对应上 Exchange 的内部规则就将消息推送到对应的 Queue 中去. 那么接下来就来详细讲解下 Exchange 里面类型.
- Exchange Types
- fanout
fanout 类型的 Exchange 路由规则非常简单, 它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中.
上图所示, 生产者 (P) 生产消息 1 将消息 1 推送到 Exchange, 由于 Exchange Type=fanout 这时候会遵循 fanout 的规则将消息推送到所有与它绑定 Queue, 也就是图上的两个 Queue 最后两个消费者消费.
direct
direct 类型的 Exchange 路由规则也很简单, 它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中
当生产者 (P) 发送消息时 Rotuing key=booking 时, 这时候将消息传送给 Exchange,Exchange 获取到生产者发送过来消息后, 会根据自身的规则进行与匹配相应的 Queue, 这时发现 Queue1 和 Queue2 都符合, 就会将消息传送给这两个队列, 如果我们以 Rotuing key=create 和 Rotuing key=confirm 发送消息时, 这时消息只会被推送到 Queue2 队列中, 其他 Routing Key 的消息将会被丢弃.
topic
前面提到的 direct 规则是严格意义上的匹配, 换言之 Routing Key 必须与 Binding Key 相匹配的时候才将消息传送给 Queue, 那么 topic 这个规则就是模糊匹配, 可以通过通配符满足一部分规则就可以传送. 它的约定是:
routing key 为一个句点号 "." 分隔的字符串(我们将被句点号 "." 分隔开的每一段独立的字符串称为一个单词), 如 "stock.usd.nyse","nyse.vmw","quick.orange.rabbit"
binding key 与 routing key 一样也是句点号 "." 分隔的字符串
binding key 中可以存在两种特殊字符 "*" 与 "#", 用于做模糊匹配, 其中 "*" 用于匹配一个单词,"#" 用于匹配多个单词(可以是零个)
当生产者发送消息 Routing Key=F.C.E 的时候, 这时候只满足 Queue1, 所以会被路由到 Queue 中, 如果 Routing Key=A.C.E 这时候会被同是路由到 Queue1 和 Queue2 中, 如果 Routing Key=A.F.B 时, 这里只会发送一条消息到 Queue2 中.
headers
headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息, 而是根据发送的消息内容中的 headers 属性进行匹配.
在绑定 Queue 与 Exchange 时指定一组键值对; 当消息发送到 Exchange 时, RabbitMQ 会取到该消息的 headers(也是一个键值对的形式), 对比其中的键值对是否完全匹配 Queue 与 Exchange 绑定时指定的键值对; 如果完全匹配则消息会路由到该 Queue, 否则不会路由到该 Queue.
实现案例
首先, 需要安装 Rabbit MQ, 可以直接安装, 也可以用 Docker 安装, 这个网上教程很多, 这里就不再赘述了.
生成项目模板
为方便我们初始化项目, Spring Boot 给我们提供一个项目模板生成网站.
1. 打开浏览器, 访问: https://start.spring.io/
2. 根据页面提示, 选择构建工具, 开发语言, 项目信息等.
3. 点击 Generate the project, 生成项目模板, 生成之后会将压缩包下载到本地.
4. 使用 IDE 导入项目, 我这里使用 Eclipse, 通过导入 Maven 项目的方式导入.
添加相关依赖
清理掉不需要的测试类及测试依赖, 添加 rabbitmq 相关依赖.
- <!-- rabbitmq -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
下面给出完整的 POM 文件.
pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.1.5.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com.louis.springboot</groupId>
- <artifactId>demo</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>demo</name>
- <description>Demo project for Spring Boot</description>
- <properties>
- <java.version>1.8</java.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <!-- web -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-Web</artifactId>
- </dependency>
- <!-- swagger -->
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger2</artifactId>
- <version>2.9.2</version>
- </dependency>
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger-ui</artifactId>
- <version>2.9.2</version>
- </dependency>
- <!-- rabbitmq -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
添加相关配置
添加一个 swagger 配置类, 在工程下新建 config 包并添加一个 SwaggerConfig 配置类.
SwaggerConfig.java
- package com.louis.springboot.demo.config;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import springfox.documentation.builders.ApiInfoBuilder;
- import springfox.documentation.builders.PathSelectors;
- import springfox.documentation.builders.RequestHandlerSelectors;
- import springfox.documentation.service.ApiInfo;
- import springfox.documentation.spi.DocumentationType;
- import springfox.documentation.spring.Web.plugins.Docket;
- import springfox.documentation.swagger2.annotations.EnableSwagger2;
- @Configuration
- @EnableSwagger2
- public class SwaggerConfig {
- @Bean
- public Docket createRestApi(){
- return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo())
- .select()
- .apis(RequestHandlerSelectors.any())
- .paths(PathSelectors.any()).build();
- }
- private ApiInfo apiInfo(){
- return new ApiInfoBuilder()
- .title("Swagger API Doc")
- .description("This is a restful api document of Swagger.")
- .version("1.0")
- .build();
- }
- }
修改 application.properties 文件名为 application.YAML, 在其中添加 RabbitMQ 配置信息, 根据自己安装的 RabbitMQ 配置.
application.YAML
- # rabbitmq 配置
- spring:
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
普通队列模式
新建一个 RabbitMQ 配置类, 并添加一个 demoQueue 队列.
RabbitConfig.java
- package com.louis.springboot.demo.config;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class RabbitConfig {
- /**
- * 定义 demoQueue 队列
- * @return
- */
- @Bean
- public Queue demoString() {
- return new Queue("demoQueue");
- }
- }
编写一个消息发布者, 并编写一个发送方法, 通过 AmqpTemplate 往 "demoQueue" 发送消息.
RabbitProducer.java
- package com.louis.springboot.demo.mq;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- @Component
- public class RabbitProducer {
- @Autowired
- private AmqpTemplate rabbitTemplate;
- public void sendDemoQueue() {
- Date date = new Date();
- String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
- System.out.println("[demoQueue] send msg:" + dateString);
- // 第一个参数为刚刚定义的队列名称
- this.rabbitTemplate.convertAndSend("demoQueue", dateString);
- }
- }
编写一个消息消费者, 通过 @RabbitListener(queues = "demoQueue")注解监听 "demoQueue" 队列, 并用 @RabbitHandler 注解相关方法, 这样在在队列收到消息之后, 交友 @RabbitHandler 注解的方法进行处理.
DemoQueueConsumer.java
- package com.louis.springboot.demo.mq;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- @RabbitListener(queues = "demoQueue")
- public class DemoQueueConsumer {
- /**
- * 消息消费
- * @RabbitHandler 代表此方法为接受到消息后的处理方法
- */
- @RabbitHandler
- public void recieved(String msg) {
- System.out.println("[demoQueue] recieved message:" + msg);
- }
- }
编写一个控制器, 注入 RabbitProducer 调用相关消息发送方法, 方便通过接口触发消息发送.
RabbitMqController.java
- package com.louis.springboot.demo.controller;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.Web.bind.annotation.GetMapping;
- import org.springframework.Web.bind.annotation.RestController;
- import com.louis.springboot.demo.mq.RabbitProducer;
- @RestController
- public class RabbitMqController {
- @Autowired
- private RabbitProducer rabbitProducer;
- @GetMapping("/sendDemoQueue")
- public Object sendDemoQueue() {
- rabbitProducer.sendDemoQueue();
- return "success";
- }
- }
编译并启动应用, 打开浏览器, 访问: http://localhost:8080/swagger-ui.HTML, 进入 swagger 接口文档界面.
调用两次 sendDemoQueue 接口, 在控制台可以看到我们输出的信息, 说明消息已经成功发送并被消费.
- [demoQueue] send msg: 2019-58-183 04:07:38
- [demoQueue] recieved message: 2019-58-183 04:07:38
- [demoQueue] send msg: 2019-01-183 05:07:05
- [demoQueue] recieved message: 2019-01-183 05:07:05
Fanout 广播模式
Fanout 其实就是广播模式, 只要跟它绑定的队列都会通知并且接受到消息. 修改配置类, 在 RabbitConfig 中添加如下 fanout 模式的队列跟交换机信息. 在代码中我们配置了三个队列名, 一个 fanout 交换机, 并且将这三个队列绑定到了 fanout 交换器上. 只要我们往这个交换机生产新的消息, 那么这三个队列都会收到.
RabbitConfig.java
- //=================== fanout 广播模式 ====================
- @Bean
- public Queue fanoutA() {
- return new Queue("fanout.a");
- }
- @Bean
- public Queue fanoutB() {
- return new Queue("fanout.b");
- }
- @Bean
- public Queue fanoutC() {
- return new Queue("fanout.c");
- }
- /**
- * 定义个 fanout 交换器
- * @return
- */
- @Bean
- FanoutExchange fanoutExchange() {
- // 定义一个名为 fanoutExchange 的 fanout 交换器
- return new FanoutExchange("fanoutExchange");
- }
- /**
- * 将定义的 fanoutA 队列与 fanoutExchange 交换机绑定
- * @return
- */
- @Bean
- public Binding bindingExchangeWithA() {
- return BindingBuilder.bind(fanoutA()).to(fanoutExchange());
- }
- /**
- * 将定义的 fanoutB 队列与 fanoutExchange 交换机绑定
- * @return
- */
- @Bean
- public Binding bindingExchangeWithB() {
- return BindingBuilder.bind(fanoutB()).to(fanoutExchange());
- }
- /**
- * 将定义的 fanoutC 队列与 fanoutExchange 交换机绑定
- * @return
- */
- @Bean
- public Binding bindingExchangeWithC() {
- return BindingBuilder.bind(fanoutC()).to(fanoutExchange());
- }
然后我们在 RabbitProducer 中添加一个 sendFanout 方法, 用来向 fanout 队列发送消息.
RabbitProducer.java
- public void sendFanout() {
- Date date = new Date();
- String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
- System.out.println("[fanout] send msg:" + dateString);
- // 注意 第一个参数是我们交换机的名称 , 第二个参数是 routerKey 我们不用管空着就可以, 第三个是你要发送的消息
- this.rabbitTemplate.convertAndSend("fanoutExchange", "", dateString);
- }
同样的, 在控制器里添加一个访问接口.
RabbitMqController.java
- @GetMapping("/sendFanout")
- public Object sendFanout() {
- rabbitProducer.sendFanout();
- return "success";
- }
接着针对三个广播队列分别编写一个消息消费者, 指定队列和处理函数.
FanoutAConsumer.java
- package com.louis.springboot.demo.mq;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- @RabbitListener(queues = "fanout.a")
- public class FanoutAConsumer {
- /**
- * 消息消费
- * @RabbitHandler 代表此方法为接受到消息后的处理方法
- */
- @RabbitHandler
- public void recieved(String msg) {
- System.out.println("[fanout.a] recieved message:" + msg);
- }
- }
FanoutBConsumer.java
- package com.louis.springboot.demo.mq;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- @RabbitListener(queues = "fanout.b")
- public class FanoutBConsumer {
- /**
- * 消息消费
- * @RabbitHandler 代表此方法为接受到消息后的处理方法
- */
- @RabbitHandler
- public void recieved(String msg) {
- System.out.println("[fanout.b] recieved message:" + msg);
- }
- }
FanoutCConsumer.java
- package com.louis.springboot.demo.mq;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- @RabbitListener(queues = "fanout.c")
- public class FanoutCConsumer {
- /**
- * 消息消费
- * @RabbitHandler 代表此方法为接受到消息后的处理方法
- */
- @RabbitHandler
- public void recieved(String msg) {
- System.out.println("[fanout.c] recieved message:" + msg);
- }
- }
重新启动应用, 调用 sendFanout 接口, 通过控制台可以看到消息发送之后, a, b, c 三个队列都收到了消息.
- [fanout] send msg:2019-47-183 05:07:12
- [fanout.c] recieved message: 2019-47-183 05:07:12
- [fanout.b] recieved message: 2019-47-183 05:07:12
- [fanout.a] recieved message: 2019-47-183 05:07:12
Topic 主题模式
利用 topic 模式可以实现模糊匹配, 同样的, 在 RabbitConfig 中配置 topic 队列跟交换器, 注意的是这里需要多配置一个 bindingKey.
RabbitConfig.java
- //=================== topic 主题模式 ====================
- @Bean
- public Queue topiocA() {
- return new Queue("topic.a");
- }
- @Bean
- public Queue topicB() {
- return new Queue("topic.b");
- }
- @Bean
- public Queue topicC() {
- return new Queue("topic.c");
- }
- /**
- * 定义个 topic 交换器
- * @return
- */
- @Bean
- TopicExchange topicExchange() {
- // 定义一个名为 fanoutExchange 的 fanout 交换器
- return new TopicExchange("topicExchange");
- }
- /**
- * 将定义的 topicA 队列与 topicExchange 交换机绑定
- * @return
- */
- @Bean
- public Binding bindingTopicExchangeWithA() {
- return BindingBuilder.bind(topiocA()).to(topicExchange()).with("topic.msg");
- }
- /**
- * 将定义的 topicB 队列与 topicExchange 交换机绑定
- * @return
- */
- @Bean
- public Binding bindingTopicExchangeWithB() {
- return BindingBuilder.bind(topicB()).to(topicExchange()).with("topic.#");
- }
- /**
- * 将定义的 topicC 队列与 topicExchange 交换机绑定
- * @return
- */
- @Bean
- public Binding bindingTopicExchangeWithC() {
- return BindingBuilder.bind(topicC()).to(topicExchange()).with("topic.*.z");
- }
上述配置中:
topicA 的 key 为 topic.msg 那么他只会接收包含 topic.msg 的消息
topicB 的 key 为 topic.# 那么他只会接收 topic 开头的消息
topicC 的 key 为 topic.*.z 那么他只会接收 topic.x.z 这样格式的消息
然后修改 RabbitProducer, 在其中添加如下三个方法, 如方法名所示, 分别根据匹配规则发送到 A\B,B,B\C 队列.
RabbitProducer.java
- public void sendTopicTopicAB() {
- Date date = new Date();
- String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
- dateString = "[topic.msg] send msg:" + dateString;
- System.out.println(dateString);
- // 注意 第一个参数是我们交换机的名称 , 第二个参数是 routerKey topic.msg, 第三个是你要发送的消息
- // 这条信息将会被 topic.a topic.b 接收
- this.rabbitTemplate.convertAndSend("topicExchange", "topic.msg", dateString);
- }
- public void sendTopicTopicB() {
- Date date = new Date();
- String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
- dateString = "[topic.good.msg] send msg:" + dateString;
- System.out.println(dateString);
- // 注意 第一个参数是我们交换机的名称 , 第二个参数是 routerKey , 第三个是你要发送的消息
- // 这条信息将会被 topic.b 接收
- this.rabbitTemplate.convertAndSend("topicExchange", "topic.good.msg", dateString);
- }
- public void sendTopicTopicBC() {
- Date date = new Date();
- String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
- dateString = "[topic.m.z] send msg:" + dateString;
- System.out.println(dateString);
- // 注意 第一个参数是我们交换机的名称 , 第二个参数是 routerKey , 第三个是你要发送的消息
- // 这条信息将会被 topic.b,topic.c 接收
- this.rabbitTemplate.convertAndSend("topicExchange", "topic.m.z", dateString);
- }
同样的, 在控制器里面添加发送服务对应的接口.
RabbitMqController.java
- @GetMapping("/sendTopicTopicAB")
- public Object sendTopicTopicAB() {
- rabbitProducer.sendTopicTopicAB();
- return "success";
- }
- @GetMapping("/sendTopicTopicB")
- public Object sendTopicTopicB() {
- rabbitProducer.sendTopicTopicB();
- return "success";
- }
- @GetMapping("/sendTopicTopicBC")
- public Object sendTopicTopicBC() {
- rabbitProducer.sendTopicTopicBC();
- return "success";
- }
接着针对三个主题队列编写对应的消息消费者.
TopicAConsumer.java
- package com.louis.springboot.demo.mq;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- @RabbitListener(queues = "topic.a")
- public class TopicAConsumer {
- /**
- * 消息消费
- * @RabbitHandler 代表此方法为接受到消息后的处理方法
- */
- @RabbitHandler
- public void recieved(String msg) {
- System.out.println("[topic.a] recieved message:" + msg);
- }
- }
TopicBConsumer.java
- package com.louis.springboot.demo.mq;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- @RabbitListener(queues = "topic.b")
- public class TopicBConsumer {
- /**
- * 消息消费
- * @RabbitHandler 代表此方法为接受到消息后的处理方法
- */
- @RabbitHandler
- public void recieved(String msg) {
- System.out.println("[topic.b] recieved message:" + msg);
- }
- }
TopicCConsumer.java
- package com.louis.springboot.demo.mq;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- @RabbitListener(queues = "topic.c")
- public class TopicCConsumer {
- /**
- * 消息消费
- * @RabbitHandler 代表此方法为接受到消息后的处理方法
- */
- @RabbitHandler
- public void recieved(String msg) {
- System.out.println("[topic.c] recieved message:" + msg);
- }
- }
重启应用, 调用 sendTopicTopicAB 接口, 经过匹配, route key 为 "topic.msg" 的消息被发送到了 topic.a 和 topic.b.
- [topic.msg] send msg:2019-12-183 06:07:22
- [topic.b] recieved message:[topic.msg] send msg:2019-12-183 06:07:22
- [topic.a] recieved message:[topic.msg] send msg:2019-12-183 06:07:22
调用 sendTopicTopicB 接口, 经过匹配, route key 为 "topic.good.msg" 的消息被发送到了 topic.b.
- [topic.good.msg] send msg:2019-15-183 06:07:23
- [topic.b] recieved message:[topic.good.msg] send msg:2019-15-183 06:07:23
调用 sendTopicTopicBC 接口, 经过匹配, route key 为 "topic.m.z" 的消息被发送到了 topic.b 和 topic.c.
- [topic.m.z] send msg:2019-16-183 06:07:09
- [topic.b] recieved message:[topic.m.z] send msg:2019-16-183 06:07:09
- [topic.c] recieved message:[topic.m.z] send msg:2019-16-183 06:07:09
参考资料
官方网站: https://www.rabbitmq.com/
百度百科: https://baike.baidu.com/item/rabbitmq/9372144?fr=aladdin
中文教程: http://rabbitmq.mr-ping.com/description.html
相关导航
Spring Boot 系列教程目录导航
Spring Boot: 快速入门教程
Spring Boot: 整合 Swagger 文档
Spring Boot: 整合 MyBatis 框架
Spring Boot: 实现 MyBatis 分页
源码下载
码云: https://gitee.com/liuge1988/spring-boot-demo.git
来源: https://www.cnblogs.com/xifengxiaoma/p/11121355.html