目录
1, 简单概述 RabbitMQ 重要作用
2, 简单概述 RabbitMQ 重要概念
3,Spring Boot 整合 RabbitMQ
前言
RabbitMQ 是一个消息队列, 主要是用来实现应用程序的异步和解耦, 同时也能起到消息缓冲, 消息分发的作用. 消息中间件最主要的作用还是解耦, 中间件最标准的用法是生产者生产消息传送到队列, 消费者从队列中拿取消息并处理, 生产者不用关心是谁来消费, 消费者不用关心谁在生产消息, 从而达到解耦的目的. 在分布式的系统中, 消息队列也会被用在很多其它的方面, 比如: 分布式事务的支持, RPC 的调用等等.
@
1, 简单概述 RabbitMQ 重要作用
首先谈谈作用, 你知道它有啥用才会有兴趣去知道它的重要概念, 走进它, 亲近它! 上面已经提过了 RabbitMQ 主要是用来实现程序的异步和解耦. 这里也主要讲解它是如何做到异步和解耦的.
1.1, 异步
对比一下使用消息队列实现异步的好处:
1.2, 解耦
至于解耦只能靠自己的对耦合的理解, 这里就以文字的形式概述:
以上面消息队列实现异步场景分析: 主线程依旧处理耗时低的入库操作, 然后把需要处理的消息写进消息队列中, 这个写入耗时可以忽略不计, 非常快, 然后, 独立的发邮件子系统, 和独立的发短信子系统, 同时订阅消息队列, 进行单独处理. 处理好之后, 向队列发送 ACK 确认, 消息队列整条数据删除. 这个流程也是现在各大公司都在用的方式, 以 SOA 服务化各个系统, 把耗时操作, 单独交给独立的业务系统, 通过消息队列作为中间件, 达到应用解耦的目的, 并且消耗的资源很低, 单台服务器能承受更大的并发请求.
到这里, 经过一段存文字的熏陶, 估计各位已经一脸懵逼了, 你们个个都是人才做到一脸懵逼的看完, 阿姨都忍不住给你喊 666....
2, 简单概述 RabbitMQ 重要概念
首先, RabbitMQ 是消息中间件的一种, 类似的还有 ActiveMQ,RocketMQ.... 总的来说这些消息中间件都泛指的就是分布式系统中完成消息的发送和接收的基础软件.
接下来重点来了...
消息中间件工作过程 == 生产者消费者模型
因此对于消息队列来说最重要的三个概念就是: 生产者, 消息队列, 消费者
2.1,RabitMQ 的工作流程
上面主要对消息队列做了一个共性分析, 对于 RabbitMQ 消息队列来说, 除了这三个重要概念以外, 还有一个很重要的概念就是交换机 (Exchange). 交换机使得生产者和消息队列之间产生了隔离, 生产者将消息发送给交换机, 而交换机则根据调度策略把相应的消息转发给对应的消息队列.
因此对于 RabbitMQ 来说最重要的四个概念就是: 生产者, 消息队列, 消费者, 交换机
总的来说, RabitMQ 的工作流程如下所示:
而具体的交换机如下会讲到.
2.2, 交换机
对交换机的通俗易懂说法就是:
交换机好比快递公司派发快递, 是百世快递就给百世快递小哥, 是申通快递就派发给申通快递小哥, 是圆通快递就派发给圆通快递小哥, 而这些不同派发方式就是一种匹配规则, 实际上交换机有四种类型, 分别为 Direct,topic,headers,Fanout, 而这四种类型就好比四种不同的匹配规则, 交换机就类似这种意思.
上面已经提到了交换机有四种类型 Direct,topic,headers,Fanout, 而这也是一个重点, 下面简单概述一下这四种类型:
Direct 类型:[重点]
Direct 是 RabbitMQ 默认的交换机模式, 也是最简单的模式. 即创建消息队列的时候, 指定一个 BindingKey. 当发送者发送消息的时候, 指定对应的 Key. 当 Key 和消息队列的 BindingKey 一致的时候, 消息将会被发送到该消息队列中.
topic 类型:[重点]
topic 转发信息主要是依据通配符, 队列和交换机的绑定主要是依据一种模式 (通配符 + 字符串), 而当发送消息的时候, 只有指定的 Key 和该模式相匹配的时候, 消息才会被发送到该消息队列中. 比如 *.news 或者 #.news, 其中比如 #代表 0 到多个随机字符.
Fanout 类型:[重点]
Fanout 是路由广播的形式, 将会把消息发给绑定它的全部队列, 即便设置了 key, 也会被忽略.
headers 类型:
headers 也是根据一个规则进行匹配, 在消息队列和交换机绑定的时候会指定一组键值对规则, 而发送消息的时候也会指定一组键值对规则, 当两组键值对规则相匹配的时候, 消息会被发送到匹配的消息队列中.
2.3,RabbitMQ 关键概念总览简述
简单来说, RabbitMQ 关键概念如下:
1, 生产者: 发送消息的程序
2, 消费者: 监听接收消费消息的程序
3, 消息: 一串二进制数据流
4, 队列: 消息的暂存区 / 存储区
5, 交换机: 消息的中转站, 用于接收分发消息. 其中有 fanout,direct,topic,headers 四种类型
6, 路由 \ 键: 相当于密钥 / 第三者, 与交换机绑定即可路由消息到指定的队列!
而有了上面的这些概念之后, 再来了解了解消息模型的演变历程, 当然, 这一历程在 RabbitMQ 官网也是可以得知的, RabbitMQ 官网: https://www.rabbitmq.com/getstarted.html
以下图文转截于 https://blog.csdn.net/u013871100/article/details/82982235, 于此同时, 推荐各位去看看这篇文章, 写的真的不错.
好了到这里, RabbitMQ 就概述的差不多了, 接下来进行代码整合阶段.
3,Spring Boot 整合 RabbitMQ
Spring Boot 整合 RabbitMQ 原理分析
按照 SpringBoot 的常规套路, 估计可能存在一个 Rabbit 的 XXXAutoConfiguration, 全局检索一下, 毋庸置疑存在 RabbitAutoConfiguration, 那就来分析分析这个类.
1, 首先映入眼帘的就是自动配置了连接工厂的 ConnectionFactory
那么, 这个工程做了些什么呢? 进去看看其源代码
- public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties,
- ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {
- PropertyMapper map = PropertyMapper.get();
- CachingConnectionFactory factory = new CachingConnectionFactory(
- getRabbitConnectionFactoryBean(properties).getObject());
- map.from(properties::determineAddresses).to(factory::setAddresses);
- map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);
- map.from(properties::getPublisherConfirmType).whenNonNull().to(factory::setPublisherConfirmType);
- RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();
- map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);
- map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis)
- .to(factory::setChannelCheckoutTimeout);
- RabbitProperties.Cache.Connection connection = properties.getCache().getConnection();
- map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);
- map.from(connection::getSize).whenNonNull().to(factory::setConnectionCacheSize);
- map.from(connectionNameStrategy::getIfUnique).whenNonNull().to(factory::setConnectionNameStrategy);
- return factory;
- }
里面是一些连接配置的信息操作, 而这些配置信息的来源正是其参数类 RabbitProperties , 随之看看 RabbitProperties 这个配置类
既然是配置类, 那里面就是一些配置信息, 具体的可以通过配置文件以 spring.rabbitmq 的方式进行配置.
2,RabbitTemplate 类
再往下看就是 RabbitTemplate 类了, 该类的具体的位置在于
看其 RabbitTemplate 关键源码:
- @Override
- public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
- convertAndSend(exchange, routingKey, object, (CorrelationData) null);
- }
- ....
- @Override
- @Nullable
- public Object receiveAndConvert(String queueName) throws AmqpException {
- return receiveAndConvert(queueName, this.receiveTimeout);
- }
得知, RabbitTemplate 主要是提供 RabbitMQ 发送和接受消息的功能;
3,AmqpAdmin
再接下来往下看就是 AmqpAdmin 接口了
- @Bean
- @ConditionalOnSingleCandidate(ConnectionFactory.class)
- @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
- @ConditionalOnMissingBean
- public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
- return new RabbitAdmin(connectionFactory);
- }
点进 AmqpAdmin 接口, 观察其具体关键代码:
- boolean deleteQueue(String queueName);
- // Note that nowait option is not readily exposed in Rabbit Java API but is for Rabbit .NET API.
- /**
- * Delete a queue.
- * @param queueName the name of the queue.
- * @param unused true if the queue should be deleted only if not in use.
- * @param empty true if the queue should be deleted only if empty.
- */
- void deleteQueue(String queueName, boolean unused, boolean empty);
- /**
- * Purges the contents of the given queue.
- * @param queueName the name of the queue.
- * @param noWait true to not await completion of the purge.
- */
- void purgeQueue(String queueName, boolean noWait);
- /**
- * Purges the contents of the given queue.
- * @param queueName the name of the queue.
- * @return the number of messages purged.
- * @since 2.1
- */
- int purgeQueue(String queueName);
- // Binding operations
- /**
- * Declare a binding of a queue to an exchange.
- * @param binding a description of the binding to declare.
- */
- void declareBinding(Binding binding);
- /**
- * Remove a binding of a queue to an exchange. Note unbindQueue/removeBinding was not introduced until 0.9 of the
- * specification.
- * @param binding a description of the binding to remove.
- */
- void removeBinding(Binding binding);
- ...
发现 AmqpAdmin 是用来创建和删除 Queue,Exchange,Binding 等, 起到管理组件的作用.
差不多, 工厂里的代码就分析到这里.
3.1, 整合前准备 Rabbitmq 环境
在进行整合 RabbitMQ 前, 我们需要安装好 RabbitMQ 及其后端控制台应用, 并在项目中配置一下 RabbitMQ 的相关参数以及相关 Bean 组件. 我相信你们都安装了, 不然看个锤子整合文章啊....
RabbitMQ 安装完成后, 打开后端控制台应用:
http://192.168.42.142:15672
输入账户 guest 密码 guest 之后登录, 看到下图即表示安装成功
而且肯定要自己创建几个 Exchanges 和 Queues, 并且自行 Bing 上关系
准备好之后就可以整合测试效果了.
3.2, 搭建依赖环境
之后 pom.xml 的依赖如下:
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
3.3, 配置 application.properties 文件
然后是项目配置文件层面的配置 application.properties
- spring.rabbitmq.host=192.168.42.142
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
这里只是测试整合 RabbitMQ, 以上配置暂时足够了.
3.4, 编写 ApplicationTests 测试代码
编写代码之前, 我们要先知道一点, springboot 中提供了类似 jdbcTemplate 的模板, 也就是 RabbitTemplate , 作用理解起来就是一样的, 这里先提一下, 之后就可以使用其 RabbitTemplate 的强大功能方法.
- package com.yichun.rabbitmq;
- import org.junit.jupiter.API.Test;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.function.ObjIntConsumer;
- @SpringBootTest
- class RabbitmqApplicationTests {
- // 依赖注入 RabbitTemplate
- @Autowired
- RabbitTemplate rabbitTemplate;
- //fanout: 广播方式发送数据 : 与路由键 key 无关
- @Test
- void sendMsgs() {
- rabbitTemplate.convertAndSend("exchange.fanout",""," 你大爷还是你大爷 ");
- }
- //direct: 单播方式发送数据 : 与路由键 key 一一对应
- @Test
- void contextLoads() {
- Map<String, Object> map= new HashMap<>();
- map.put("1","hello, 熊 dei 在吗?");
- map.put("2", Arrays.asList("Tom",111));
- rabbitTemplate.convertAndSend("exchange.direct","yichun.news",map);
- }
- //Topic: 按指定路由键 Key 规则方式发送数据
- @Test
- void sendTopicMsgs() {
- rabbitTemplate.convertAndSend("exchange.topic","afdd.news","年轻人站起来嗷嗷嗷嗷... 奥利给!");
- }
- // 接收数据
- @Test
- void receive(){
- Object o = rabbitTemplate.receiveAndConvert("yichun.news");
- System.out.println(o.getClass());
- System.out.println(o);
- }
- }
单个单个单元测试运行
1,fanout 广播方式发送数据观察
2,Topic 指定路由键 Key 规则方式发送数据
3,direct 单播方式发送数据
打开信息发现如下:
这是什么情况呢? 其实这种情况主要是因为 Springboot 在
RabbitAutoConfiguration
中默认使用的消息转换规则, 要想看到想看到的数据格式, 比如 JSON 格式, 这个时候就要自定义转换规则了. 实际上面两次测试也是出现这种情况.
3.5, 自定义消息转换规则
首先要想自定义消息转换规则, 我们就要改变默认的规则, 首先打开 MessageConverter 接口, 如果是 eclipse 切换的快捷键, 就可以直接 F4, 查看它的实现继承的关系类, 如下图
是的, 分析发现确实有 JSON 的转换规则, 于是, 开始编写代码. 首先编写一个 MyMQConfig 类, 具体实现如下:
- @Configuration
- public class MyMQConfig {
- @Bean
- public MessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter(); // 让其返回一个 JSON 规则的转换
- }
- }
编写完之后, 再次运行之前的发送消息, 效果如下:
3.6,AmqpAdmin 管理组件的使用
在之前我们整合前的那些交换机, 队列, 绑定信息都是自己手动添加的, 实际上, Springboot 中 RabbitAutoConfiguration 中的 ConnectionFactory 工厂存在的 AmqpAdmin 就可以创建和删除 Queue,Exchange,Binding 等, 起到管理组件的作用.
==amqpAdmin 中凡是 declareXXX 的方法都是用于创建组件, 而以 remove 和 delete 开头就是删除组件, 这是一个小技巧. 接下来就以 declareXXX 的方法来创建各个组件进行分析, 至于删除就组件就直接 amqpAdmin. 组件名字就行了.==
1, 创建 Exchange
实例代码
- @Autowired
- AmqpAdmin amqpAdmin;
- @Test
- void clareExchange() {
- amqpAdmin.declareExchange(new DirectExchange("交换机名"));
- }
分析上面的代码, 首先是关于交换机 Exchange 的, 所以先全文检索一下 Exchange, 是个接口, 如下
其次既然是创建交换机 Exchange, 那肯定要写入创建交换机的名字了, 传入的参数 DirectExchange 中, 鼠标点进去, 发现
到这里, 分析的差不多了, 可以看出上面创建 Exchange 的方法就是最简单的创建一个叫 "交换机名" 的 Exchange, 运行测试一下, 效果如下:
2, 创建 Queue
接下来同理创建 Queue
- @Autowired
- AmqpAdmin amqpAdmin;
- @Test
- void clareExchange() {
- amqpAdmin.declareQueue(new Queue("测试队列. queue",true)); //true 代表是否持久化
- }
运行效果:
3, 创建 Binding
- @Autowired
- AmqpAdmin amqpAdmin;
- @Test
- void clareExchange() {
- amqpAdmin.declareBinding(new Binding("测试队列. queue",Binding.DestinationType.QUEUE,"交换机名","路由键名",null));
- }
运行效果:
3.7, 监听消息 @EnableRabbit + @RabbitListener
我们实际开发中, 常常有如下需求, 下单通过消息队列之后库存要随之改变, 也就是触发监听机制.
这种情况往往要通过:@EnableRabbit + @RabbitListener 监听消息队列的内容
, 特别注意 @RabbitListener 中的 queues 属性是个数组
, 也就是说 @RabbitListener 可以同时监听多个消息
- !
- String[] queues() default {
- };
当然这个监听实现也很简单, 只需要在需要监听的消息业务代码上添加 @RabbitListener 注解, 然后再 Application 主方法上添加 @EnableRabbit 注解开启基于注解的 rabbit 模式即可. 实例代码如下:
- @EnableRabbit // 开启基于注解的 rabbit 模式
- @SpringBootApplication
- public class RabbitmqApplication {
- public static void main(String[] args) {
- SpringApplication.run(RabbitmqApplication.class, args);
- }
- }
dao 代码:
- public class CatDao {
- private String name;
- private int age;
- // get,set,toString, 构造方法....
- }
service 业务代码
- @Service
- public class CatService {
- // 监听方式一: 直接将对象序列化输出
- @RabbitListener(queues="yichun.news")
- public void receive(CatDao cat){ // 这里的 CatDao 就是一个普通的 bean, 这里主要用于借助触发监听到事件打印出其对象信息
- System.out.println("监听到的信息:"+ cat);
- }
- // 监听方式二: 特殊需求需要消息头等信息
- @RabbitListener(queues="yichun")
- public void receiveMessage(Message message){ // 千万注意这里的 Message 是 org.springframework.amqp.core.Message 别导错包了
- System.out.println(message.getBody());
- System.out.println(message.getMessageProperties());
- }
- }
单元 Test 测试代码:
- @Test
- void contextLoads() {
- rabbitTemplate.convertAndSend("exchange.direct","yichun.news",new CatDao("Tom 猫",12)); // 这里的数据是 object 类型的
- }
监听方式一: 直接将对象序列化输出打印结果:
监听方式二特殊需求需要消息头等信息打印结果:
如果本文对你有一点点帮助, 那么请点个赞呗, 谢谢~
最后, 若有不足或者不正之处, 欢迎指正批评, 感激不尽! 如果有疑问欢迎留言, 绝对第一时间回复!
欢迎各位关注我的公众号, 里面有一些 java 学习资料和一大波 java 电子书籍, 比如说周志明老师的深入 java 虚拟机, java 编程思想, 核心技术卷, 大话设计模式, java 并发编程实战..... 都是 java 的圣经, 不说了快上 Tomcat 车, 咋们走! 最主要的是一起探讨技术, 向往技术, 追求技术, 说好了来了就是盆友喔...
来源: https://www.cnblogs.com/yichunguo/p/12173757.html