RabbitMQ 是一种我们经常使用的消息中间件, 通过 RabbitMQ 可以帮助我们实现异步, 削峰的目的.
今天这篇, 我们来看看 Spring Boot 是如何集成 RabbitMQ, 发送消息和消费消息的. 同时我们介绍下死信队列.
集成 RabbitMQ
集成 RabbitMQ 只需要如下几步即可
1, 添加 maven 依赖
- <!--rabbitmq-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
2, 添加配置文件 application.YAML
在 application.YAML 添加配置内容如下
- spring: rabbitmq:
- host: 192.168.1.161
- port: 5672
- username: guest
- password: guest
- cache:
- channel: size: 10
- listener:
- type: simple
- simple:
- acknowledge-mode: auto
- concurrency: 5
- default-requeue-rejected: true
- max-concurrency: 100
- retry:
- enabled: true # initial-interval: 1000ms
- max-attempts: 3 # max-interval: 1000ms
- multiplier: 1
- stateless: true # publisher-confirms: true</pre>
注意:
这里最基本的配置只需要配置 host,port,username 和 password 四个属性即可
其他属性都有各自的含义, 比如 retry 是用于配置重试策略的, acknowledge-mode 是配置消息接收确认机制的.
3, 编写配置类
编写 RabbitConfig 配置类, 采用 Java Configuration 的方式配置 RabbitTemplate,Exchange 和 Queue 等信息, 具体如下所示
- package com.jackie.springbootdemo.config;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.beans.factory.config.ConfigurableBeanFactory;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Scope;
- import java.util.HashMap;
- import java.util.Map;
- @Configuration public class RabbitMQConfig implements InitializingBean { @Autowired
- SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory;
- @Override
- public void afterPropertiesSet() throws Exception {
- simpleRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
- } @Bean("jackson2JsonMessageConverter")
- public Jackson2JsonMessageConverter jackson2JsonMessageConverter(ConnectionFactory connectionFactory) {
- return new Jackson2JsonMessageConverter();
- } @Bean("rabbitTemplate")
- @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
- @Qualifier("jackson2JsonMessageConverter") Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
- RabbitTemplate template = new RabbitTemplate(connectionFactory);
- template.setMessageConverter(new Jackson2JsonMessageConverter());
- return template;
- } // --------------------- 声明队列 ------------------------
- @Bean
- public Queue demoQueue() {
- return new Queue("demo_queue");
- } // --------------------- 声明 exchange ------------------------ @Bean
- public DirectExchange demoExchange() {
- return new DirectExchange("demo_exchange");
- } // --------------------- 队列绑定 ------------------------
- @Bean
- public Binding bindingAlbumItemCreatedQueue(DirectExchange demoExchange,
- Queue demoQueue) {
- return BindingBuilder.bind(demoQueue).to(demoExchange).with("100");
- } }
注意
这里声明了 Direct 模式的 Exchange, 声明一个 Queue, 并通过 routing-key 为 100 将 demo_queue 绑定到 demo_exchange, 这样 demo_queue 就可以接收到 demo_exchange 发送的消息了.
4, 编写消息发送类
- package com.jackie.springbootdemo.message;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.support.CorrelationData;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- @Component public class Sender implements RabbitTemplate.ConfirmCallback { private RabbitTemplate rabbitTemplate;
- /**
- * 构造方法注入 */ @Autowired
- public Sender(RabbitTemplate rabbitTemplate) {
- this.rabbitTemplate = rabbitTemplate;
- rabbitTemplate.setConfirmCallback(this); //rabbitTemplate 如果为单例的话, 那回调就是最后设置的内容
- } public void sendMsg(String content) {
- rabbitTemplate.convertAndSend("demo_exchange", "100", content);
- } /**
- * 回调 */ @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println("回调 id:" + correlationData);
- if (ack) {
- System.out.println("消息成功消费");
- } else {
- System.out.println("消息消费失败:" + cause);
- }
- } }
注意
发送内容 content, 路由到 routing-key 为 100 上, 则我们就可以在 demo_queue 队列中看到发送的消息内容了
confirm 函数是回调函数, 这里因为没有消费者, 且 acknoledge-mode 是 auto(其他两种值分别是 none 和 manual), 所以 ack 是 false.
5, 编写发送消息测试类
- package com.jackie.springbootdemo;
- import com.jackie.springbootdemo.message.Sender;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
- import org.springframework.test.context.web.WebAppConfiguration;
- @RunWith(SpringRunner.class) @SpringBootTest(classes = SpringbootDemoApplication.class) @WebAppConfiguration public class RabbitApplicationTests { @Autowired
- Sender sender;
- @Test
- public void contextLoads() throws Exception {
- sender.sendMsg("test");
- } }
运行该测试类, 我们可以看到如下结果
6, 编写消息消费类
- package com.jackie.springbootdemo.message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component public class Receiver { @RabbitListener(queues = "demo_queue")
- public void created(String message) {
- System.out.println("orignal message:" + message);
- } }
注意
消息消费类也非常简单, 添加注解 @RabbitListener, 指定要监听的队列名称即可
除了注解 @RabbitListener, 我们经常还能看到 @RabbitHandler, 这两个注解可以配合起来使用.
@RabbitListener 标注在类上面表示当有收到消息的时候, 就交给 @RabbitHandler 的方法处理, 具体使用哪个方法处理, 根据 MessageConverter 转换后的参数类型, 形如
- @RabbitListener(queues = "demo_queue") public class Receiver { @RabbitHandler public void processMessage1(String message) {
- System.out.println(message);
- } @RabbitHandler
- public void processMessage2(byte[] message) {
- System.out.println(new String(message));
- } }
7, 运行消息发送测试类
从执行结果可以看到, 因为有了消费者, 所以这次打印的结果是 "消息消费成功"
而且, 我们看到 Receiver 类将消息消费并打印出消息的内容为 "test".
代码已经提交至项目 https://github.com/DMinerJackie/rome :https://github.com/DMinerJackie/rome
本来准备再说说死信队列的, 限于篇幅, 后面再写吧.
来源: https://www.cnblogs.com/bigdataZJ/p/springboot-rabbitmq.html