yls
2020/5/10
rabbitmq 的基本概念和其它相关知识请自主去官网学习
rabbitmq 官网 https://www.rabbitmq.com/#getstarted ,
本文只介绍 rabbitmq 在 springboot 中如何使用
添加依赖包
- <!--rabbitmq 客户端 start-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- <version>2.2.7.RELEASE</version>
- </dependency>
- <!--rabbitmq 客户端 end-->
添加配置文件 application.YAML
- spring:
- rabbitmq: #rabbit 连接配置信息
- host: 39.97.234.52
- port: 5672
- username: admin
- password: admin
- virtual-host: /vhost_1
rabbitmq 五种模式的使用
1. 简单队列
创建消费者
- @Component
- public class MQ {
- /**
- * 简单队列
- * autoDelete = "true" 表示没有生产者和消费者连接时自动删除
- * durable = "true" 表示队列持久化, 默认就是 true
- * @param msg
- */
- @RabbitListener(queuesToDeclare = @Queue(value = "simpleQueue",autoDelete = "true", durable = "true"))
- public void simpleQueue(String msg) {
- System.out.println("接收" + msg);
- }
- }
创建生产者
- @SpringBootTest
- public class RabbitTest {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Test
- public void simpleQueue() {
- rabbitTemplate.convertAndSend("simpleQueue", "this is simpleQueue")
- System.out.println("simple success");
- }
- }
2. 工作队列, 实现了能者多劳
创建消费者
- @Component
- public class MQ {
- /**
- * 工作队列, 多个消费者消费一个队列
- * <p>
- * AMQP 默认实现消费者确认模式, 原文如下
- * It's a common mistake to miss the basicAck and Spring AMQP helps to avoid this through its default configuration.
- * The consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery),
- * but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.
- * <p>
- * Fair dispatch vs Round-robin dispatching
- * 官网说: AMQP 默认实现消费者 fair 转发, 也就是能者多劳, 原文如下 (应该是说反了, 默认的是 250, 但是是 Round-robin dispatching)
- * However, "Fair dispatch" is the default configuration for Spring AMQP.
- * The AbstractMessageListenerContainer defines the value for DEFAULT_PREFETCH_COUNT to be 250.
- * If the DEFAULT_PREFETCH_COUNT were set to 1 the behavior would be the round robin delivery as described above.
- */
- // 设置消费者的确认机制, 并达到能者多劳的效果
- @Bean("workListenerFactory")
- public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
- SimpleRabbitListenerContainerFactory containerFactory =
- new SimpleRabbitListenerContainerFactory();
- containerFactory.setConnectionFactory(connectionFactory);
- // 自动 ack, 没有异常的情况下自动发送 ack
- //auto 自动确认, 默认是 auto
- //MANUAL 手动确认
- //none 不确认, 发完自动丢弃
- containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
- // 拒绝策略, true 回到队列 false 丢弃, 默认是 true
- containerFactory.setDefaultRequeueRejected(true);
- // 默认的 PrefetchCount 是 250, 采用 Round-robin dispatching, 效率低
- //setPrefetchCount 为 1, 即可启用 fair 转发
- containerFactory.setPrefetchCount(1);
- return containerFactory;
- }
- /**
- * 若不使用自定义 containerFactory = "workListenerFactory", 默认的轮询消费效率低
- *
- * @param s
- */
- @RabbitListener(queuesToDeclare = @Queue("workQueue"), containerFactory = "workListenerFactory")
- public void workQueue1(String s) {
- System.out.println("workQueue 1" + s);
- }
- @RabbitListener(queuesToDeclare = @Queue("workQueue"), containerFactory = "workListenerFactory")
- public void workQueue2(String s) throws InterruptedException {
- Thread.sleep(1000);
- System.out.println("workQueue 2" + s);
- }
- }
创建生产者
- @SpringBootTest
- public class RabbitTest {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Test
- public void workQueue() {
- for (int i = 0; i <10; i++) {
- rabbitTemplate.convertAndSend("workQueue", i);
- }
- System.out.println("workQueue success");
- }
- }
3. 订阅模式
创建消费者
- @Component
- public class MQ {
- /**
- * 订阅模式 fanout
- */
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue, // 临时路由
- exchange = @Exchange(value = "exchange1", type = ExchangeTypes.FANOUT))
- })
- public void fanout(String s) {
- System.out.println("订阅模式 1" + s);
- }
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue, exchange = @Exchange(value = "exchange1", type = ExchangeTypes.FANOUT))
- })
- public void fanout2(String s) {
- System.out.println("订阅模式 2" + s);
- }
- }
创建生产者
- @SpringBootTest
- public class RabbitTest {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Test
- public void fanOut() {
- rabbitTemplate.convertAndSend("exchange1", "","fan out......");
- }
- }
4. 路由模式
创建消费者
- @Component
- public class MQ {
- /**
- * 路由模式 DIRECT
- */
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue, // 临时路由
- exchange = @Exchange(value = "exchange2", type = ExchangeTypes.DIRECT),
- key = {"error", "info"} // 路由键
- )
- })
- public void router(String s) {
- System.out.println("路由模式 1" + s);
- }
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue,
- exchange = @Exchange(value = "exchange2", type = ExchangeTypes.DIRECT),
- key = {"error"} // 路由键
- )
- })
- public void router2(String s) {
- System.out.println("路由模式 2" + s);
- }
- }
创建生产者
- @SpringBootTest
- public class RabbitTest {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Test
- public void router() {
- rabbitTemplate.convertAndSend("exchange2", "info", "router");
- System.out.println("router");
- }
- }
5. 主题模式
创建消费者
- @Component
- public class MQ {
- /**
- * topic topics
- */
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue,
- exchange = @Exchange(value = "exchange3", type = ExchangeTypes.TOPIC),
- key = {"user.#"} // 路由键
- )
- })
- public void topic2(String s) {
- System.out.println("topic2" + s);
- }
- }
创建生产者
- @SpringBootTest
- public class RabbitTest {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Test
- public void topic() {
- rabbitTemplate.convertAndSend("exchange3", "user.name", "hhh");
- System.out.println("topic");
- }
- }
默认消息是持久化的, 也可以设置不持久化, 以简单队列示例
- @SpringBootTest
- public class RabbitTest {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- /**
- * AMQP 默认消息是持久化的, 但只有在队列也是持久化时才有作用, 原文如下:
- * Messages are persistent by default with Spring AMQP.
- * Note the queue the message will end up in needs to be durable as well,
- * otherwise the message will not survive a broker restart as a non-durable queue does not itself survive a restart.
- * <p>
- * MessageProperties 类中源码如下:
- * static {
- * DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
- * DEFAULT_PRIORITY = 0;
- * }
- * <p>
- * 如何设置消息不持久化?
- * 设置消息不持久化, 默认是持久化的, 这里只为记录如何设置消息不持久化, 一般不设置
- * 发送消息时, 添加 MessagePostProcessor 即可, 这里使用 lambda 表达式
- * (message) -> {
- * message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
- * return message;
- * }
- * <p>
- * 完整示例如下:
- * rabbitTemplate.convertAndSend("simpleQueue", "this is simpleQueue",
- * (message) -> {
- * message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
- * return message;
- * });
- */
- @Test
- public void simpleQueue() {
- rabbitTemplate.convertAndSend("simpleQueue", "this is simpleQueue",
- (message) -> {
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
- return message;
- });
- System.out.println("simple success");
- }
- }
如何设置生产者消息确认, 避免消息发送失败而丢失 (确认分为两步, 一是确认是否到达交换器, 二是确认是否到达队列.)
参考资料: https://www.cnblogs.com/wangiqngpei557/p/9381478.html
在配置文件中添加
- spring:
- rabbitmq: #rabbit 连接配置信息
- publisher-returns: true #开启消息从 交换机 ----》队列发送失败的回调
- publisher-confirm-type: correlated #开启消息从 生产者 ----》交换机的回调
添加配置类
- @Component
- public class ProducerConfig {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @PostConstruct
- public void pre() {
- /**
- *
- * 消息发送到交换机的回调
- *
- * public void confirm(CorrelationData correlationData, boolean b, String s) {
- *
- * System.out.println("消息唯一标识:"+correlationData);
- * System.out.println("确认结果:"+ b);
- * System.out.println("失败原因:"+ s);
- * }
- *
- */
- rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
- System.out.println("setConfirmCallback-------------------");
- System.out.println("correlationData:" + correlationData);
- System.out.println(ack);
- System.out.println(cause);
- if (ack) {
- System.out.println("发送成功");
- } else {
- System.out.println("发送失败");
- // 可以记录下来, 也可以重新发送消息...
- }
- });
- /**
- *
- * 消息从交换机发送到队列的回调, 只有发送失败时才会回调
- * public void returnedMessage(Message message, int i, String s, String s1, String s2) {
- * System.out.println("消息主体 message :"+message);
- * System.out.println("消息主体 message :"+ i);
- * System.out.println("描述:"+ s);
- * System.out.println("消息使用的交换器 exchange :"+ s1);
- * System.out.println("消息使用的路由键 routing :"+ s2);
- * }
- */
- rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
- System.out.println("setReturnCallback---------------------");
- System.out.println("消息主体 message :" + message);
- System.out.println("响应码 replyCode:" + replyCode);
- System.out.println("响应内容 replyText:" + replyText);
- System.out.println("消息使用的交换器 exchange :" + exchange);
- System.out.println("消息使用的路由键 routeKey :" + routingKey);
- // 也可以重新发送消息
- rabbitTemplate.convertAndSend(exchange, routingKey, new String(message.getBody()));
- System.out.println("重新发送消息: -----" + new String(message.getBody()));
- });
- /**
- * 网上都说必须设置 rabbitTemplate.setMandatory(true), 才能触发 ReturnCallback 回调,
- * 我尝试了一下, 并不需要设置为 true, 交换机发送消息给队列失败时, 也能触发回调
- */
- //rabbitTemplate.setMandatory(true);
- }
- }
代码 GitHub 地址: https://github.com/1612480331/Spring-Boot-rabbitmq https://github.com/1612480331/Spring-Boot-rabbitmq
来源: https://www.cnblogs.com/yloved/p/12864289.html