上篇文章介绍了使用 Redis 来实现延时任务, 这是一个比较好的方案, 但是这种方式是把 Redis 作为消息队列去使用, 而 Redis 作为消息队列还是有一些缺点的:
Redis 本身没有提供监控, 管理界面, 需要自己去实现. 我们无法方便的知道现在队列的情况, 比如是否有积压, 消费情况是如何的, 生产情况又是如何的.
消息可能被重复消费, 如果是幂等性操作也没什么, 但是如果非幂等性操作, 就需要其他的解决方案来解决这个问题.
Redis 本身没有 ACK 机制, 消息没有那么可靠, 当然这个缺点在这个案例中, 并不是那么明显, 因为我们可以在该执行的都执行成功了, 才去删除数据.
...
当然最根本的问题是 Redis 本身就不是为了队列而生的, 它是为了存储而生的, 所以它缺少一些队列才有的功能也是 "情理之中" 的. 不过, Redis5 引进了 Stream, 据说 这也是一个功能很强大的队列, 但是我还没去看. 这里就不说了.
在本节中, 我将用 RabbitMQ 来实现延时任务.
关于 RabbitMQ 的安装, 我就不做介绍了, 网上都有, 而且没有什么难度.
在使用方面, RabbitMQ 比 Redis 难很多, 毕竟使用的比较少, 而且不少公司都对 MQ 进行了封装, 使其更好用, 但是同时也隐藏了 MQ 在使用方面的不少细节.
从基本没有接触过 RabbitMQ, 到要使用 RabbitMQ 来完成延时任务, 也是一个 "跳跃性" 的任务. 我们应该先了解 RabbitMQ 一些基础概念, 基本使用 等等. 仅仅靠一两句话是远远不够的. 本文的主题在于 "使用 RabbitMQ 来完成延时任务". 所以在这里我默认大家都有一定的 RabbitMQ 使用经验了.
好了, 让我们开始吧.
首先, 让我们引进两个名词:
TTL, 死信:
Time To Live, 这个名词也说不上是一个新名词, Redis 中也有, 就是 存活时间, 也就是我们经常说的过期时间了, 放在 MQ 里面, 特指 消息的存活时间. 消息超过了存活时间, 就认为这个消息 "死" 了, 称之为 "死信".
Dead Letter Exchange
死信交换器. 创建死信交换器和创建其他交换器没什么区别, 只是我们需要告诉队列, 死信需要被推送到死信交换器上.
对于生产者来说, 需要创建一个 Connection 连接, 接着在 Connection 中创建一个 Channel, 通过 Channel 申明两个交换器, 一个是 用来接收订单数据的交换器, 一个是用来接收超时订单数据的交换机, 然后申明两个队列, 一个是订单数据队列, 并且需要告诉这个队列, 如果有消息超时了, 需要转发到 "用来接收超时订单数据的交换机", 还要申明一个超时订单数据队列. 然后把 "用来接收订单数据的交换器" 和 "订单数据队列" 进行绑定, 把 "用来接收超时订单数据的交换机" 和 "超时订单数据队列" 进行绑定. 前置准备工作才算完成, 下面就是通过 Channel 往 "用来接收订单数据的交换器" 推数据了.
为了帮助大家更好的理解, 我简单的画了一张图:
希望大家看了文字之后, 再对照图片, 可以有所理解.
对于生产者来说, 就比较简单了, 前置工作就是创建 Connection 连接, 再创建 Channel, 然后通过 Channel, 消费 "超时订单数据队列" 就 OK 了.
下面我直接放出代码:
需要在 pom 中引入依赖:
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.5.0</version>
- </dependency>
- public class Main {
- static ConnectionFactory connectionFactory;
- static Connection connection;
- static {
- connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("localhost");
- try {
- connection = connectionFactory.newConnection();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) throws Exception {
- producer();
- Thread thread = new Thread(() -> {
- try {
- consume();
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
- thread.start();
- }
- private static void producer() throws Exception {
- Channel channel = connection.createChannel();// 创建一个 channel, 不管是生产数据, 还是消费数据, 都是通过 channel 去操作的
- channel.exchangeDeclare("orderExchange", "direct", true);// 定义一个交换机, 路由类型为 direct, 所有的订单会塞给此交换机
- channel.exchangeDeclare("orderDelayExchange", "direct", true);// 定义一个交换机, 路由类型为 direct, 延迟的订单会塞给此交换机
- HashMap<String, Object> arguments = new HashMap<String, Object>();
- arguments.put("x-dead-letter-exchange", "orderDelayExchange");// 申明死信交换机是名称为 orderDelayExchange 的交换机
- channel.queueDeclare("order_queue", true, false, false,
- arguments);// 定义一个名称为 order_queue 的队列, 绑定上面定义的参数, 这样就告诉 rabbit 此队列延迟的消息, 发送给 orderDelayExchange 交换机
- channel.queueDeclare("order_delay_queue", true, false, false,
- null);// 定义一个名称为 order_delay_queue 的队列
- channel.queueBind("order_queue", "orderExchange",
- "delay");//order_queue 和 orderExchange 绑定, 路由为 delay. 路由也为 delay 的消息会通过 orderExchange 进入到 order_queue 队列
- channel.queueBind("order_delay_queue", "orderDelayExchange",
- "delay");//order_delay_queue 和 orderDelayExchange 绑定
- AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
- builder.expiration("15000");// 设置消息 TTL(消息生存时间)
- builder.deliveryMode(2);// 设置消息持久化
- AMQP.BasicProperties properties = builder.build();
- Thread productThread = new Thread(() -> {
- for (int i = 0; i < 20; i++) {
- String order = "order" + i;
- try {
- channel.basicPublish("orderExchange", "delay",
- properties, order.getBytes());// 通过 channel, 向 orderExchange 交换机发送路由为 delay 的消息, 这样就可以进入到 order_queue 队列
- String str = "现在时间是" + new Date().toString() + "" + order +" 的消息产生了 ";
- System.out.println(str);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- try {
- channel.close();
- } catch (Exception ex) {
- }
- });
- productThread.start();
- }
- private static void consume() throws Exception {
- Channel channel = connection.createChannel();// 创建一个 channel, 不管是生产数据, 还是消费数据, 都是通过 channel 去操作的
- // 消费名称为 order_delay_queue 的队列, 且关闭自动应答, 需要手动应答
- channel.basicConsume("order_delay_queue", false, new DefaultConsumer(channel) {
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- long deliveryTag = envelope.getDeliveryTag();// 消息的标记, 应答的时候需要传入这个参数
- String str = "现在时间是" + new Date().toString() + "" + new String(body) +" 的消息消费了 ";
- System.out.println(str);
- channel.basicAck(deliveryTag, false);// 手动应答, 代表这个消息处理完成了
- }
- });
- }
- }
下面我们运行一下:
代码注释写的还是比较清晰的, 希望大家可以看懂吧.
这一节, 我没有像上两节一样, 讲的那么细, 因为如果从 RabbitMQ 的基础讲起, 可能需要三四章的内容来做铺垫, 这就脱离主题了. 如果有机会的话, 我会再花一个系列去介绍 RabbitMQ.
好了, 实现延时任务系列到这里就结束了, 当然我这里只是抛砖引玉, 大家肯定还有不少更好的实现方式.
来源: https://www.cnblogs.com/CodeBear/p/10056810.html