延迟队列, 也就是一定时间之后将消息体放入队列, 然后消费者才能正常消费. 比如 1 分钟之后发送短信, 发送邮件, 检测数据状态等.
Redisson Delayed Queue
如果你项目中使用了 redisson, 那么恭喜你, 使用延迟队列将非常的简单.
基于 Redis 的 Redisson 分布式延迟队列 (Delayed Queue) 结构的 RDelayedQueue Java 对象在实现了 RQueue 接口的基础上提供了向队列按要求延迟添加项目的功能. 该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略.
- RQueue<String> distinationQueue = ...
- RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
- // 10 秒钟以后将消息发送到指定队列
- delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
- // 一分钟以后将消息发送到指定队列
- delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
在该对象不再需要的情况下, 应该主动销毁. 仅在相关的 Redisson 对象也需要关闭的时候可以不用主动销毁.
Java DelayQueue
DelayQueue 它本质上是一个队列, 而这个队列里也只有存放 Delayed 的子类才有意义.
延迟队列 demo
- public class DelayTask implements Delayed {
- private long startDate;
- public DelayTask(Long delayMillions) {
- this.startDate = System.currentTimeMillis() + delayMillions;
- }
- @Override
- public int compareTo(Delayed o) {
- Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
- }
- @Override
- public long getDelay(TimeUnit unit) {
- return this.startDate - System.currentTimeMillis();
- }
- public static void main(String[] args) throws Exception {
- BlockingQueue<DelayTask> queue = new DelayQueue<>();
- DelayTask delayTask = new DelayTask(1000 * 5L);
- queue.put(delayTask);
- while (queue.size()>0){
- queue.take();
- }
- }
- }
延迟队列消费原理
源码中出现了三次 await 字眼:
第一次是当队列为空时, 等待;
第二次等待是因为, 发现有任务, 没有到执行时间, 并且有准备执行的线程(leader), 那不好意思, 还得接续等待直到下一个可执行的任务.
第三次是真正延时的地方了, available.awaitNanos(delay), 此时也没有别的线程要执行, 也就是我将要执行, 等待剩下的延迟时间即可.
延迟队列生产原理
为保证消费者正常消费, 如果优先队列头元素和当前放入元素相等, 则说明当前元素消费的优先级高, 重置准备消费的线程 (leader) 为 null, 唤醒消费者线程重新执行 take 方法逻辑.
手写一个 Redis 延迟队列
Redis 延迟队列设计
延迟消息体设计
延迟消息体 Message 实现了 Delayed 接口, 这样 Java DelayQueue 就知道什么时候取出消息体.
Redis 延迟队列实现
RedisDelayQueue 构造函数依赖 Redis 操作缓存服务对象和目标队列名称(Redis key).
offer 方法传入 member(具体消息),delay(延迟时间),timeUnit(时间单位), 然后封装成延迟消息体 Message 对象, 放入 Java DelayQueue 中.
run 方法是一个循环体, 不断的从 Java DelayQueue 对象中获取消息体, 然后放入 Redis 对应的目标队列里.
延迟队列测试 demo
控制台打印效果
思考
这种方案实现比较简单, 使用的时候一定要谨慎, 应用于延迟小, 消息量不大的场景是没问题的, 毕竟 Java DelayQueue 是占用内存的. 另外也可以考虑利用 Redis 的 sorted set 结构实现延迟队列, 使用 TimeStamp 作为 score, 比如你的任务是要延迟 5 分钟, 那么就在当前时间上加 5 分钟作为 score , 轮询任务每秒只轮询 score 大于当前时间的 key 即可, 如果任务支持有误差, 那么当没有扫描到有效数据的时候可以休眠对应时间再继续轮询.
来源: https://www.cnblogs.com/hujunzheng/p/12587572.html