一, 前言
延迟队列的使用场景: 1. 未按时支付的订单, 30 分钟过期之后取消订单; 2. 给活跃度比较低的用户间隔 N 天之后推送消息, 提高活跃度; 3. 过 1 分钟给新注册会员的用户, 发送注册邮件等.
实现延迟队列的方式有两种:
通过消息过期后进入死信交换器, 再由交换器转发到延迟消费队列, 实现延迟功能;
使用 rabbitmq-delayed-message-exchange 插件实现延迟功能;
注意: 延迟插件 rabbitmq-delayed-message-exchange 是在 RabbitMQ 3.5.7 及以上的版本才支持的, 依赖 Erlang/OPT 18.0 及以上运行环境.
由于使用死信交换器相对曲折, 本文重点介绍第二种方式, 使用 rabbitmq-delayed-message-exchange 插件完成延迟队列的功能.
二, 安装延迟插件
1.1 下载插件
打开官网下载: http://www.rabbitmq.com/community-plugins.html
选择相应的对应的版本 "3.7.x" 点击下载.
注意: 下载的是. zip 的安装包, 下载完之后需要手动解压.
1.2 安装插件
拷贝插件到 Docker:
docker cp D:\rabbitmq_delayed_message_exchange-20171201-3.7.x.ez rabbit:/plugins
RabbitMQ 在 Docker 的安装, 请参照本系列的上一篇文章: http://www.apigo.cn/2018/09/11/springboot13/
1.3 启动插件
进入 docker 内部:
docker exec -it rabbit /bin/bash
开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查询安装的所有插件:
rabbitmq-plugins list
安装正常, 效果如下图:
重启 RabbitMQ, 使插件生效
docker restart rabbit
三, 代码实现
3.1 配置队列
- import com.example.rabbitmq.mq.DirectConfig;
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import java.util.HashMap;
- import java.util.Map;
- @Configuration
- public class DelayedConfig {
- final static String QUEUE_NAME = "delayed.goods.order";
- final static String EXCHANGE_NAME = "delayedec";
- @Bean
- public Queue queue() {
- return new Queue(DelayedConfig.QUEUE_NAME);
- }
- // 配置默认的交换机
- @Bean
- CustomExchange customExchange() {
- Map<String, Object> args = new HashMap<>();
- args.put("x-delayed-type", "direct");
- // 参数二为类型: 必须是 x-delayed-message
- return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
- }
- // 绑定队列到交换器
- @Bean
- Binding binding(Queue queue, CustomExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
- }
- }
3.2 发送消息
- import org.springframework.amqp.AmqpException;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessagePostProcessor;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- @Component
- public class DelayedSender {
- @Autowired
- private AmqpTemplate rabbitTemplate;
- public void send(String msg) {
- SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- System.out.println("发送时间:" + sf.format(new Date()));
- rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setHeader("x-delay", 3000);
- return message;
- }
- });
- }
- }
3.3 消费消息
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- @Component
- @RabbitListener(queues = "delayed.goods.order")
- public class DelayedReceiver {
- @RabbitHandler
- public void process(String msg) {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- System.out.println("接收时间:" + sdf.format(new Date()));
- System.out.println("消息内容:" + msg);
- }
- }
3.4 测试队列
- import com.example.rabbitmq.RabbitmqApplication;
- import com.example.rabbitmq.mq.delayed.DelayedSender;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class DelayedTest {
- @Autowired
- private DelayedSender sender;
- @Test
- public void Test() throws InterruptedException {
- SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
- sender.send("Hi Admin.");
- Thread.sleep(5 * 1000); // 等待接收程序执行之后, 再退出测试
- }
- }
执行结果如下:
发送时间: 2018-09-11 20:47:51
接收时间: 2018-09-11 20:47:54
消息内容: Hi Admin.
完整代码访问我的 GitHub:https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq
四, 总结
到此为止我们已经使用 "rabbitmq-delayed-message-exchange" 插件实现了延迟功能, 但是需要注意的一点是, 如果使用命令 "rabbitmq-plugins disable rabbitmq_delayed_message_exchange" 禁用了延迟插件, 那么所有未发送的延迟消息都将丢失.
来源: https://www.cnblogs.com/vipstone/p/9967649.html