消息队列中间件是分布式系统中重要的组件, 主要解决应用耦合, 异步消息, 流量削锋等问题, 实现高性能, 高可用, 可伸缩和最终一致性架构使用较多的消息队列有 ActiveMQ,RabbitMQ,Kafka,RocketMQ 等, 这里主要讲解 RabbitMQ 的简单使用
创建 SpringBoot 项目, 并引入依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
Queue 方式
定义配置类
- /**
- * @author Gjing
- **/
- @Configuration
- public class RabbitMqConfiguration {
- /**
- * 声明一个名为 simple 的队列
- */
- @Bean
- public Queue testQueue() {
- return new Queue("simple");
- }
- }
声明一个生产者
- /**
- * @author Gjing
- **/
- @Component
- public class Producer {
- @Resource
- private AmqpTemplate rabbitTemplate;
- public void send() {
- String message = "hello";
- this.rabbitTemplate.convertAndSend("simple", message);
- }
- }
声明消费者
- /**
- * @author Gjing
- **/
- @Slf4j
- @Component
- public class Consumer {
- @RabbitListener(queues = "simple")
- public void receive(String message) {
- log.info("消费者 1 收到消息:{}", message);
- }
- }
创建 Controller 进行调用
- /**
- * @author Gjing
- **/
- @RestController
- public class DemoController {
- @Resource
- private Producer producer;
- @PostMapping("/message")
- public void send() {
- for (int i = 0; i < 10; i++) {
- this.producer.send();
- }
- }
- }
执行结果
topic exchange 方式
定义配置类
- /**
- * @author Gjing
- **/
- @Configuration
- public class RabbitMqConfiguration {
- /**
- * 声明一个名为 topic.message1 的队列
- */
- @Bean
- public Queue topicQueue() {
- return new Queue("topic.message1");
- }
- /**
- * 声明一个名为 topic.message2 的队列
- */
- @Bean
- public Queue topicQueue2() {
- return new Queue("topic.message2");
- }
- /**
- * 声明一个名为 exchange 的交换机
- */
- @Bean
- public TopicExchange exchange() {
- return new TopicExchange("exchange");
- }
- /**
- * 将 topic.message1 的队列绑定到 exchange 交换机
- */
- @Bean
- public Binding bindMessage1() {
- return BindingBuilder.bind(topicQueue()).to(exchange()).with("topic.message1");
- }
- /**
- * 将 topic.message2 的队列绑定到 exchange 交换机
- */
- @Bean
- public Binding bindMessage2() {
- return BindingBuilder.bind(topicQueue2()).to(exchange()).with("topic.message2");
- }
- }
定义生产者
- /**
- * @author Gjing
- **/
- @Component
- public class TopicProducer {
- @Resource
- private AmqpTemplate rabbitTemplate;
- public void send() {
- String message1 = "I am topic.message1";
- String message2 = "I am topic.message2";
- this.rabbitTemplate.convertAndSend("exchange", "topic.message1", message1);
- this.rabbitTemplate.convertAndSend("exchange", "topic.message2", message2);
- }
- }
定义消费者 1
- /**
- * @author Gjing
- **/
- @Component
- @Slf4j
- public class TopicConsumer1 {
- @RabbitListener(queues = "topic.message1")
- public void receive(String message) {
- log.info("消费者 1 收到消息:{}", message);
- }
- }
定义消费者 2
- /**
- * @author Gjing
- **/
- @Component
- @Slf4j
- public class TopicConusmer2 {
- @RabbitListener(queues = "topic.message2")
- public void receive(String message) {
- log.info("消费者 2 收到消息:{}", message);
- }
- }
创建 controller 进行调用
- /**
- * @author Gjing
- **/
- @RestController
- public class TopicController {
- @Resource
- private TopicProducer topicProducer;
- @PostMapping("/message-topic")
- public void sendMessageTopic() {
- for (int i = 0; i < 10; i++) {
- this.topicProducer.send();
- }
- }
- }
执行结果
fanout 方式
定义配置类
- /**
- * @author Gjing
- **/
- @Configuration
- public class RabbitMqConfiguration {
- /**
- * 声明一个名为 fanout.1 的队列
- */
- @Bean
- public Queue fanoutQueue1() {
- return new Queue("fanout.1");
- }
- /**
- * 声明一个名为 fanout.2 的队列
- */
- @Bean
- public Queue fanoutQueue2() {
- return new Queue("fanout.2");
- }
- /**
- * 声明一个名为 fanout.3 的队列
- */
- @Bean
- public Queue fanoutQueue3() {
- return new Queue("fanout.3");
- }
- /**
- * 声明一个名为 fanoutExchange 的转发器
- */
- @Bean
- public FanoutExchange fanoutExchange() {
- return new FanoutExchange("fanoutExchange");
- }
- /**
- * 将队列 fanoutQueue1 绑定到 fanout 转发器
- */
- @Bean
- public Binding bindFanout1() {
- return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
- }
- /**
- * 将队列 fanoutQueue1 绑定到 fanout 转发器
- */
- @Bean
- public Binding bindFanout2() {
- return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
- }
- /**
- * 将队列 fanoutQueue1 绑定到 fanout 转发器
- */
- @Bean
- public Binding bindFanout3() {
- return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
- }
- }
定义生产者
- /**
- * @author Gjing
- **/
- @Component
- public class FanoutProducer {
- @Resource
- private AmqpTemplate amqpTemplate;
- public void send() {
- String message = "hello, I am speaker";
- // 这里的 routingKey 会被 rabbitMQ 忽略, 如果不设置这个参数会导致发送消息失败,
- // 所以这里随便写 (我给他空字符串),rabbitMQ 会默认发给所有绑定的
- this.amqpTemplate.convertAndSend("fanoutExchange","", message);
- }
- }
定义消费者 1
- /**
- * @author Gjing
- **/
- @Component
- @Slf4j
- public class FanoutConsumer1 {
- @RabbitListener(queues = "fanout.1")
- public void receive(String message) {
- log.info("消费者 1 收到消息:{}", message);
- }
- }
定义消费者 2
- /**
- * @author Gjing
- **/
- @Component
- @Slf4j
- public class FanoutConsumer2 {
- @RabbitListener(queues = "fanout.2")
- public void receive(String message) {
- log.info("消费者 2 收到消息:{}", message);
- }
- }
定义消费者 3
- /**
- * @author Gjing
- **/
- @Component
- @Slf4j
- public class FanoutConsumer3 {
- @RabbitListener(queues = "fanout.3")
- public void receive(String message) {
- log.info("消费者 3 收到消息:{}", message);
- }
- }
创建 controller 调用
- /**
- * @author Gjing
- **/
- @RestController
- public class FanoutController {
- @Resource
- private FanoutProducer fanoutProducer;
- @PostMapping("/message-fanout")
- public void sendFanout() {
- this.fanoutProducer.send();
- }
- }
执行结果
RabbitMQ 核心概念
server: 又称 Broker, 接受客户端的连接实现 AMQP 实体服务; connection: 与 broker 的连接; channel: 网络通道, 几乎所有的操作都是在 channel 中进行; message: 服务器和应用程序之间传送的数据, 由 properties 和 body 组成, properties 可以对消息进行修饰, 比如消息的优先级和高级特性, body 为消息的内容; exchange: 交换机, 接收消息, 根据路邮件转发消息到绑定的队列; binding:exchange 和 queue 之间的虚拟连接, 可以包含 routing key;Routing key: 一个路由规则, 虚拟机用他确定如何路由一个特定信息; Queue: 也称为 message Queue, 消息队列, 保存信息并将它们转发给消费者.
Exchange 类型:
Fanout: 路由规则是把所有发送到该 Exchange 的消息路由到所有与她绑定的 Queue 中
备注: 生产者 P 生产消息 1 推送到 Exchange, 由于 Exchange Type=fanout 这时候会遵循 fanout 的规则将消息推送到所有与他绑定的 Queue.
direct: 把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中.
备注: 生产者 P 发送消息时 Routing key = bloking 时, 这时候将消息传送到 Exchange,Exchange 获取到生产者发送过来的消息后, 会根据自身的规则进行与匹配响应的 Queue, 这时候发现 Queue1 和 Queue2 都符合, 就会将消息传送给这两个队列, 如果我们以 Routing key = create 和 routing key = confirm 发送消息时, 这时候消息只会被推送到 Queue2 队列中, 其他的 Routing key 的消息会被丢弃.
topic: 模糊匹配, 通过通配符满足一部分规则就可以传送, 其中注意的是有两个字符 '星号' 和 #号, 其中 星号 用于匹配一个单词,# 号用于匹配多个单词 (可以是 0 个)
备注: 当生产者发送消息 Routing Key=F.C.E 的时候, 这时候只满足 Queue1, 所以会被路由到 Queue 中, 如果 Routing Key=A.C.E 这时候会被同是路由到 Queue1 和 Queue2 中, 如果 Routing Key=A.F.B 时, 这里只会发送一条消息到 Queue2 中.
常见面试题
什么是元数据? 元数据分为哪些类型? 包括哪些内容? 与 cluster 相关的元数据有哪些? 元数据是如何保存的? 元数据在 cluster 中是如何分布的?
在非 cluster 模式中, 元数据主要分为 Queue 元数据 (Queue 名字和属性等),Exchange 元数据 (Exchange 名字, 类型, 属性等),binding 元数据 (存放路由关系的查找表),vhost 元数据 (vhost 范围内针对前三者的名字空间约束和安全属性设置). 在 cluster 模式下, 包括 cluster 中 node 位置信息和 node 关系信息. 元数据按照 erlang node 的类型确定是仅保存于 RAM 中, 还是同时保存在 RAM 或者 Disk 上, 元数据在 cluster 中是全 node 分布的
rabbitmq 的一个 Queue 中存放的 message 是否有数量限制?
可以认为无限制, 限制取决于机器的内存, 但是消息过多会导致处理效率的下降.
rabbitmq 如何实现延迟队列?
没有直接支持延迟队列功能, 但是可以通过两个特性来实现延迟队列,1TTL: 通过队列属性设置, 队列中的所有消息都有相同的过期时间, 对消息进行单独设置, 每条消息 TTL 可以不同. 如果同时使用, 则消息的过期时间以两者之间 TTL 较小的那个数值为准, 消息在队列的生存时间一旦超过设置的 TTL 值, 就称为 dead letter.2DLX:Queue 可以配置 X-dead-letter-exchange 和 x-dead-letter-routing-key(可选) 两个参数, 如果队列内出现了 dead letter, 则按照这两个参数重新路由转发到指定的队列.
X-dead-letter-exchange: 出现 dead letter 之后将 dead letter 重新发送到指定 exchange
出现 dead letter 之后将 dead letter 重新按照指定的 routing-key 发送
出现 dead letter 的情况有:
消息或者队列的 TTL 过期; 2. 队列达到最大长度; 3. 消息被消费者拒绝
前往第二章: SpringBoot 使用 Redis(二)
以上为个人见解, 如有误欢迎各位指正
来源: https://yq.aliyun.com/articles/705358