前面的学习都是基于原生的 API, 下面我们使用 spingboot 来整合 rabbitmq
springboot 对 rabbitmq 提供了友好支持, 极大的简化了开发流程
引入 maven
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
配置 YAML
- rabbitmq:
- host: 47.102.103.232
- port: 5672
- username: admin
- password: admin
- virtual-host: /test
- publisher-confirms: true
- publisher-returns: true
- cache:
- channel:
- size: 10
- listener:
- simple:
- acknowledge-mode: manual
- concurrency: 1
- max-concurrency: 3
- retry:
- enabled: true
这是基础的配置, 看不懂的配置后面会介绍
更详细的配置参考官方 https://docs.spring.io/spring-boot/docs/2.1.3.RELEASE/reference/htmlsingle/#boot-features-rabbitmq(搜索 rabbit 往下拉即可)
代码实现
配置类
- @Configuration
- public class RabbitConfig {
- @Bean
- public Queue helloQueue() {
- return new Queue("helloQueue");
- }
- // 创建 topic 交换机
- @Bean
- public TopicExchange helloExchange() {
- return new TopicExchange("helloExchange");
- }
- @Bean
- public Binding bindingPaymentExchange(Queue helloQueue, TopicExchange helloExchange) {
- return BindingBuilder.bind(helloQueue).to(helloExchange).with("hello.#");
- }
- /**
- * 定制化 amqp 模版
- * connectionFactory: 包含了 YAML 文件配置参数
- */
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- // 必须设置为 true, 不然当 发送到交换器成功, 但是没有匹配的队列, 不会触发 ReturnCallback 回调
- // 而且 ReturnCallback 比 ConfirmCallback 先回调, 意思就是 ReturnCallback 执行完了才会执行 ConfirmCallback
- rabbitTemplate.setMandatory(true);
- // 设置 ConfirmCallback 回调 YAML 需要配置 publisher-confirms: true
- rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
- // 如果发送到交换器都没有成功(比如说删除了交换器),ack 返回值为 false
- // 如果发送到交换器成功, 但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true (这是一个坑, 需要注意)
- if (ack) {
- String messageId = correlationData.getId();
- System.out.println("confirm:"+messageId);
- }
- });
- // 设置 ReturnCallback 回调 YAML 需要配置 publisher-returns: true
- // 如果发送到交换器成功, 但是没有匹配的队列, 就会触发这个回调
- rabbitTemplate.setReturnCallback((message, replyCode, replyText,
- exchange, routingKey) -> {
- String messageId = message.getMessageProperties().getMessageId();
- System.out.println("return:"+messageId);
- });
- return rabbitTemplate;
- }
- }
回调机制
消息不管是否投递到交换机都进行 ConfirmCallback 回调, 投递成功 ack=true, 否则为 false
交换机匹配到队列成功则不进行 ReturnCallback 回调, 否则先进行 ReturnCallback 回调再进行 ConfirmCallback 回调
如果消息成功投递到交换机, 但没匹配到队列, 则 ConfirmCallback 回调 ack 仍为 true
生产者
- @Component
- public class RbProducer {
- // 注意一定要使用 RabbitTemplate!!
- // 虽然 RabbitTemplate 实现了 AmqpTemplate 但是 AmqpTemplate 里并没有能发送 correlationData 的方法
- @Resource
- private RabbitTemplate rbtemplate;
- public void send1(String msg){
- //CorrelationData 用于 confirm 机制里的回调确认
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- rbtemplate.convertAndSend("helloExchange", "hello.yj", msg,correlationData);
- }
- public void send2(User user){
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- rbtemplate.convertAndSend("helloExchange", "hello.yj", user,correlationData);
- }
- }
消费者
- @Component
- @RabbitListener(queues = "helloQueue")
- public class RbConsumer {
- @RabbitLister(queues = "helloQueue")
- public void receive0(Message msg, Channel channel) throws IOException {
- System.out.println("consumer receive message0:" + msg);
- channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
- }
- @RabbitHandler
- public void receive1(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException {
- System.out.println("consumer receive message1:" + msg);
- channel.basicAck(deliveryTag, false);
- }
- @RabbitHandler
- public void receive2(User user, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException {
- System.out.println("consumer receive message2:"+user);
- // 如果发生以下情况投递消息所有的通道或连接被突然关闭 (包括消费者端丢失 TCP 连接, 消费者应用程序(进程) 挂掉, 通道级别的协议异常)任何已经投递的消息但是没有被消费者端确认的消息会自动重新排队.
- // 请注意, 连接检测不可用客户端需要一段时间才会发现, 所以会有一段时间内的所有消息会重新投递
- // 因为消息的可能重新投递, 所有必须保证消费者端的接口的幂等.
- // 在 RabbitMQ 中影响吞吐量最大的参数是: 消息确认模式和 Qos 预取值
- // 自动消息确认模式或设置 Qos 预取值为无限虽然可以最大的提高消息的投递速度, 但是在消费者端未及时处理的消息的数量也将增加, 从而增加消费者 RAM 消耗, 使用消费者端奔溃. 所以以上两种情况需要谨慎使用.
- //RabbitMQ 官方推荐 Qos 预取值设置在 100 到 300 范围内的值通常提供最佳的吞吐量, 并且不会有使消费者奔溃的问题
- channel.basicAck(deliveryTag, false);
- channel.basicQos(100);
- // 代表消费者拒绝一条或者多条消息, 第二个参数表示一次是否拒绝多条消息, 第三个参数表示是否把当前消息重新入队
- // channel.basicNack(deliveryTag, false, false);
- // 代表消费者拒绝当前消息, 第二个参数表示是否把当前消息重新入队
- // channel.basicReject(deliveryTag,false);
- }
- }
@RabbitListener+@RabbitHandler: 消费者监听
使用 @RabbitListener+@RabbitHandler 组合进行监听, 监听器会根据队列发来的消息类型自动选择处理方法
channel.basicAck(deliveryTag, false): 手动确认机制
deliverTag: 该消息的标识, 每来一个消息该标识 + 1
multiple: 第二个参数标识书否批量确认
requeue: 被拒绝的是否重新入队
channel.basicQos(100): 最多未确认的消息数量为 100, 超过 100 队列将停止给该消费者投递消息
更多参数详解参考 https://www.cnblogs.com/piaolingzxh/p/5448927.html
测试
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = TestBoot.class)
- public class TestRabbit {
- @Resource
- private RbProducer producer;
- @Test
- public void send1() {
- producer.send1("hello,im a string");
- }
- @Test
- public void send2() {
- User user = new User();
- user.setNickname("hello,im a object");
- producer.send2(user);
- }
- }
成功消费
完结
下篇博客我们讨论下在拥有了手动 ack 机制, confirm 机制, return 机制后, 是否真的可靠~
来源: https://www.cnblogs.com/pokid/p/10527708.html