概述
本文主要介绍如何通过 Spring boot 连接阿里云 AMQP 服务.
操作步骤
1, 示例程序下载, 下载地址.
2, 参数配置, AMQP 管理控制台获取.
- resources -> application.properties
- spring.application.name=rabbitmq-demo
- spring.rabbitmq.host=18********617278.mq-amqp.cn-hangzhou-a.aliyuncs.com
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=******
- spring.rabbitmq.password=******
- spring.rabbitmq.virtual-host=******
- spring.rabbitmq.template.mandatory=true
- spring.rabbitmq.publisher-confirms=true
- spring.rabbitmq.publisher-returns=true
- RabbitConfig -> RESOURCE_OWNER_ID
- private static final long RESOURCE_OWNER_ID =18********617278L;// 资源 owner 账户 ID 信息
3, 代码调整 (默认 demo 仅做了发送未被路由的消息的测试, 为了测试相对完整, 调整了部分代码及注释, 方便进一步理解, 也可以直接跳过此步骤直接运行测试即可)
- SenderWithCallback.class
- package com.alibaba.rabbit;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- import java.time.LocalDateTime;
- import java.util.UUID;
- @Component
- public class SenderWithCallback {
- Logger log= LoggerFactory.getLogger(SenderWithCallback.class);
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @PostConstruct
- public void initRabbitTemplate() {
- // 设置生产者消息确认
- rabbitTemplate.setConfirmCallback(new RabbitConfirmCallback());
- rabbitTemplate.setReturnCallback(new RabbitReturnCallback());
- }
- public void send() {
- String exchange = "exchange-rabbit-springboot-advance5";
- String routingKey = "product";
- String unRoutingKey = "norProduct";
- //1. 发送一条未被路由的消息 触发 returncallback,ConfirmCallback
- String message = LocalDateTime.now().toString() + "发送一条消息.";
- rabbitTemplate.convertAndSend(exchange, unRoutingKey, message, new CorrelationData("unRouting-" + UUID.randomUUID().toString()));
- log.info("发送一条消息, exchange:[{}],routingKey:[{}],message:[{}]", exchange, unRoutingKey, message);
- //2. 发送一条路由的消息 触发 ConfirmCallback
- rabbitTemplate.convertAndSend(exchange, routingKey, message, new CorrelationData("Routing-" + UUID.randomUUID().toString()));
- log.info("发送一条消息, exchange:[{}],routingKey:[{}],message:[{}]", exchange, routingKey, message);
- //3. 直接向 queue 中发送测试 供监听消费测试
- rabbitTemplate.convertAndSend("queue", "test queue message.");
- }
- }
- RabbitReturnCallback.class
- package com.alibaba.rabbit;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- /**
- * 设置 ReturnCallback 回调
- * 如果发送到交换器成功, 但是没有匹配的队列, 就会触发这个回调 在 ConfirmCallback 之前执行
- */
- public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {
- Logger log= LoggerFactory.getLogger(RabbitReturnCallback.class);
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- log.error("message:"+message+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
- }
- }
- RabbitConfirmCallback.class
- package com.alibaba.rabbit;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- /**
- * 生产者端将消息发送出去, 消息到达 RabbitMQ 之后, 会返回一个到达确认.
- * 这个确认实际上就是官方常说的 ConfirmCallback, 我们通过在生产者端使用一个回调类来监听 RabbiMQ 返回的消息确认.
- * Spring AMQP 中我们通过设置 RabbitTemplate 的 ConfirmCallback 属性来实现消息确认回调, 通过一个实现了 ConfirmCallback 的类来实现回调逻辑.
- */
- public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {
- Logger log= LoggerFactory.getLogger(RabbitConfirmCallback.class);
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- log.error("correlationData:"+correlationData+",ack:"+ack+",cause:"+cause); //ack 结果为 true, 表明正常接收到了消息
- }
- }
测试运行
启动程序浏览器请求 send 操作: http://localhost:8080/test/send
运行效果
019-06-07 12:42:43.693 INFO 2752 --- [nio-8080-exec-1] com.alibaba.rabbit.SenderWithCallback : 发送一条消息, exchange:[exchange-rabbit-springboot-advance5],routingKey:[norProduct],message:[2019-06-07T12:42:43.612 发送一条消息.]
- 2019-06-07 12:42:43.727 ERROR 2752 --- [124.156.22:5672] com.alibaba.rabbit.RabbitReturnCallback : message:(Body:'2019-06-07T12:42:43.612 发送一条消息.' MessageProperties [headers={
- spring_returned_message_correlation=unRouting-75d1580a-c93d-4d9c-bf06-2ed05ad8e464
- }, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),replyCode:312,replyText:NO_ROUTE,exchange:exchange-rabbit-springboot-advance5,routingKey:norProduct
- 2019-06-07 12:42:43.728 ERROR 2752 --- [124.156.22:5672] c.alibaba.rabbit.RabbitConfirmCallback : correlationData:CorrelationData [id=unRouting-75d1580a-c93d-4d9c-bf06-2ed05ad8e464],ack:true,cause:null
2019-06-07 12:42:43.753 INFO 2752 --- [nio-8080-exec-1] com.alibaba.rabbit.SenderWithCallback : 发送一条消息, exchange:[exchange-rabbit-springboot-advance5],routingKey:[product],message:[2019-06-07T12:42:43.612 发送一条消息.]
- 2019-06-07 12:42:43.891 ERROR 2752 --- [124.156.22:5672] c.alibaba.rabbit.RabbitConfirmCallback : correlationData:CorrelationData [id=Routing-9ee37a35-0f8e-4a0d-b00b-2fc0fab574b4],ack:true,cause:null
- 2019-06-07 12:42:43.892 ERROR 2752 --- [124.156.22:5672] c.alibaba.rabbit.RabbitConfirmCallback : correlationData:null,ack:true,cause:null
- Receiver : test queue message.
参考链接
https://github.com/AliwareMQ/amqp-demos
rabbitmq 生产者的消息确认
来源: https://yq.aliyun.com/articles/704801