RabbitMQ 是一个在 https://baike.baidu.com/item/AMQP 基础上完成的, 可复用的企业消息系统. 他遵循 Mozilla Public License 开源协议. RabbitMQ 是流行的开源消息队列系统, 用 erlang 语言开发. RabbitMQ 是 AMQP(高级消息队列协议) 的标准实现.
消息中间件的工作过程可以用生产者消费者模型来表示. 即, 生产者不断的向消息队列发送信息, 而消费者从消息队列中消费信息.
如果你还没有安装 rabbitmq 的, 可以看看这篇《CentOS 安装 MQ》
不说了不说了, 来一张图直截了当的看看 MQ 工作的具体过程:
开局一张图 故事全靠编. 从上图可看出, 对于消息队列来说, 生产者, 消息队列, 消费者是最重要的三个概念, 生产者发消息到消息队列中去, 消费者监听指定的消息队列, 并且当消息队列收到消息之后, 接收消息队列传来的消息, 并且给予相应的处理. 消息队列常用于分布式系统之间互相信息的传递.
v 基础概念
对于 RabbitMQ 来说, 除了这三个基本模块以外, 还添加了一个模块, 即交换机 (Exchange). 它使得生产者和消息队列之间产生了隔离, 生产者将消息发送给交换机, 而交换机则根据调度策略把相应的消息转发给对应的消息队列. 那么 RabitMQ 的工作流程如下所示:
关于 rabbitmq 几个基础名词的介绍:
Broker: 简单来说就是消息队列服务器实体.
Exchange: 消息交换机, 它指定消息按什么规则, 路由到哪个队列.
Queue: 消息队列载体, 每个消息都会被投入到一个或多个队列.
Binding: 绑定, 它的作用就是把 exchange 和 queue 按照路由规则绑定起来.
Routing Key: 路由关键字, exchange 根据这个关键字进行消息投递.
vhost: 虚拟主机, 一个 broker 里可以开设多个 vhost, 用作不同用户的权限分离.
producer: 消息生产者, 就是投递消息的程序.
consumer: 消息消费者, 就是接受消息的程序.
channel: 消息通道, 在客户端的每个连接里, 可建立多个 channel, 每个 channel 代表一个会话任务.
交换机的主要作用是接收相应的消息并且绑定到指定的队列. 交换机有四种类型, 分别为 Direct,topic,headers,Fanout:
Direct: 处理路由键. 需要将一个队列绑定到交换机上, 要求该消息与一个特定的路由键完全匹配. 这是一个完整的匹配. 如果一个队列绑定到该交换机上要求路由键 "demo", 则只有被标记为 "demo" 的消息才被转发, 不会转发 demo.ooo, 也不会转发 test.123, 只会转发 demo.
Topic: 转发信息主要是依据通配符, 将路由键和某模式进行匹配. 此时队列需要绑定要一个模式上. 符号 "#" 匹配一个或多个词, 符号 "*" 匹配不多不少一个词. 因此 "audit.#" 能够匹配到 "audit.irs.corporate", 但是 "audit.*" 只会匹配到 "audit.irs".
Headers: 根据一个规则进行匹配, 在消息队列和交换机绑定的时候会指定一组键值对规则, 而发送消息的时候也会指定一组键值对规则, 当两组键值对规则相匹配的时候, 消息会被发送到匹配的消息队列中.
Fanout: 路由广播的形式, 将会把消息发给绑定它的全部队列, 即便设置了 key, 也会被忽略.
v 实战演练
2.1 创建 MQ
注: 若是现有工程引入 MQ, 则添加 Maven 引用.
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
这里我们延续之前 springboot 系列博文中的例子 hellospringboot, 在已有项目中添加 mq 的 Maven 引用.
2.2 application.properties
在 application.properties 文件当中引入 RabbitMQ 基本的配置信息
- # ----- MQ -------- #
- spring.rabbitmq.host=192.168.11.108
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
2.3 添加实体类 MyModel
- package com.demo.mq.model;
- import java.io.Serializable;
- import java.util.UUID;
- /**
- * Created by toutou on 2019/1/1.
- */
- public class MyModel implements Serializable {
- private static final long serialVersionUID = 1L;
- private UUID id;
- private String info;
- public UUID getId() {
- return id;
- }
- public void setId(UUID id) {
- this.id = id;
- }
- public String getInfo() {
- return info;
- }
- public void setInfo(String info) {
- this.info = info;
- }
- }
2.4 添加 RabbitConfig
- package com.demo.mq.common;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.beans.factory.config.ConfigurableBeanFactory;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Scope;
- /**
- * Created by toutou on 2019/1/1.
- */
- @Configuration
- public class RabbitConfig {
- @Value("${spring.rabbitmq.host}")
- private String host;
- @Value("${spring.rabbitmq.port}")
- private int port;
- @Value("${spring.rabbitmq.username}")
- private String username;
- @Value("${spring.rabbitmq.password}")
- private String password;
- public static final String EXCHANGE_A = "my-mq-exchange_A";
- public static final String EXCHANGE_B = "my-mq-exchange_B";
- public static final String QUEUE_A = "QUEUE_A";
- public static final String QUEUE_B = "QUEUE_B";
- public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
- public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
- @Bean
- public CachingConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setPublisherConfirms(true);
- return connectionFactory;
- }
- @Bean
- @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- public RabbitTemplate rabbitTemplate() {
- RabbitTemplate template = new RabbitTemplate(connectionFactory());
- return template;
- }
- /**
- * 针对消费者配置
- * 1. 设置交换机类型
- * 2. 将队列绑定到交换机
- FanoutExchange: 将消息分发到所有的绑定队列, 无 routingkey 的概念
- HeadersExchange : 通过添加属性 key-value 匹配
- DirectExchange: 按照 routingkey 分发到指定队列
- TopicExchange: 多关键字匹配
- */
- @Bean
- public DirectExchange defaultExchange() {
- return new DirectExchange(EXCHANGE_A);
- }
- /**
- * 获取队列 A
- * @return
- */
- @Bean
- public Queue queueA() {
- return new Queue(QUEUE_A, true); // 队列持久
- }
- /**
- * 获取队列 B
- * @return
- */
- @Bean
- public Queue queueB() {
- return new Queue(QUEUE_B, true); // 队列持久
- }
- /**
- * 把交换机, 队列, 通过路由关键字进行绑定
- * @return
- */
- @Bean
- public Binding binding() {
- return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
- }
- /**
- * 一个交换机可以绑定多个消息队列, 也就是消息通过一个交换机, 可以分发到不同的队列当中去.
- * @return
- */
- @Bean
- public Binding bindingB(){
- return BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B);
- }
- }
2.5 添加消息的生产者 MyProducer
- package com.demo.mq.producer;
- import com.demo.mq.common.RabbitConfig;
- import com.demo.mq.model.MyModel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.support.CorrelationData;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- /**
- * Created by toutou on 2019/1/1.
- */
- @Component
- public class MyProducer implements RabbitTemplate.ConfirmCallback {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- // 由于 rabbitTemplate 的 scope 属性设置为 ConfigurableBeanFactory.SCOPE_PROTOTYPE, 所以不能自动注入
- private RabbitTemplate rabbitTemplate;
- /**
- * 构造方法注入 rabbitTemplate
- */
- @Autowired
- public MyProducer(RabbitTemplate rabbitTemplate) {
- this.rabbitTemplate = rabbitTemplate;
- rabbitTemplate.setConfirmCallback(this); //rabbitTemplate 如果为单例的话, 那回调就是最后设置的内容
- }
- public void sendMsg(MyModel model) {
- // 把消息放入 ROUTINGKEY_A 对应的队列当中去, 对应的是队列 A
- rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, model);
- }
- /**
- * 回调
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- logger.info("回调 id:" + correlationData);
- if (ack) {
- logger.info("消息成功消费");
- } else {
- logger.info("消息消费失败:" + cause);
- }
- }
- }
2.6 添加消息的消费者 MyReceiver
- package com.demo.mq.receiver;
- import com.demo.mq.common.RabbitConfig;
- import com.demo.mq.model.MyModel;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- /**
- * Created by toutou on 2019/1/1.
- */
- @Component
- @RabbitListener(queues = RabbitConfig.QUEUE_A)
- public class MyReceiver {
- @RabbitHandler
- public void process(MyModel model) {
- System.out.println("接收处理队列 A 当中的消息:" + model.getInfo());
- }
- }
2.7 添加 MyMQController
- package com.demo.controller;
- import com.demo.mq.model.MyModel;
- import com.demo.mq.producer.MyProducer;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.Web.bind.annotation.RestController;
- import java.util.UUID;
- /**
- * Created by toutou on 2019/1/1.
- */
- @RestController
- @Slf4j
- public class MyMQController {
- @Autowired
- MyProducer myProducers;
- @GetMapping("/mq/producer")
- public String myProducer(String content){
- MyModel model = new MyModel();
- model.setId(UUID.randomUUID());
- model.setInfo(content);
- myProducers.sendMsg(model);
- return "已发送:" + content;
- }
- }
2.8 项目整体目录
2.9 调试
2.9.1 在页面中请求 http://localhost:8081/mq/producer?content=hello rabbitmq
2.9.2 查看 http://ip:15672/#/queues 的变化
关于 RabbitMQ Management 有疑问的, 可以看上篇博文.《浅谈 RabbitMQ Management》.
2.9.3 查看消费者日志记录
这样一个完整的 rabbitmq 实例就有了.
v 源码地址
作 者: 请叫我头头哥
来源: https://www.cnblogs.com/toutou/p/springboot_rabbitmq.html