[TOC]
一, 业务场景
所谓延时队列就是延时的消息队列, 下面说一下一些业务场景比较好理解
1.1 实践场景
订单支付失败, 每隔一段时间提醒用户
用户并发量的情况, 可以延时 2 分钟给用户发短信
...
1.2 实现方式
这些情况都可以使用延时队列来做, 实现延时队列比较场景的有使用消息队列 MQ 来实现, 比如 RocketMQ 等等, 也可以使用 Redis 来实现, 本博客主要介绍一下 Redis 实现延时队列
二, Redis 延时队列
2.1 Redis 列表实现
Redis 实现延时队列可以通过其数据结构列表 (list) 来实现, 顺便复习一下 Redis 的列表, 实现列表, Redis 可以通过队列和栈来实现:
- /* 队列: First in first out */
- // 加两个 value
- >rpush keynames key1 key2
- 2
- // 计算
- >llen keynames
- 2
- >lpop keynames
- key1
- >lpop keynames
- key2
- //rpush 会自动过期的
- >rpop keynames
- NULL
- /* 栈: First in last out */
- // 同样, 加两个元素
- >rpush keynames key1 key2
- 2
- >rpop keynames
- key2
- >rpop keynames
- key1
对于 Redis 的基本数据结构, 可以参考我之前的博客:
然后怎么实现延时? Thread 睡眠或者线程 join? 这种方法是可以实现, 不过假如用户一多? 10 个请求就要延时 10N 了, 这种情况系统性能不好的话就会出现线程阻塞了的情况.
队列空了的情况? 就会出现 pop 的死循环, 这种情况很可怕, 很吃系统 CPU, 虽然可以通过线程睡眠方法来缓解, 但不是最好的方法
这时候就要介绍一下 Redis 的 blpop/brpop 来替换 lpop/rpop,blpop/brpop 阻塞读在队列没有数据的时候, 会立即进入休眠状态, 一旦数据到来, 则立刻醒过来. 消息的延迟几乎为零
2.2 Redis 集合实现
Redis 的有序集合 (zset) 也可以用于实现延时队列, 消息作为 value, 时间作为 score, 这里顺便复习一下 Redis 的有序集合
- //9.0 是 score 也就是权重
- >zadd keyname 9.0 math
- 1
- >zadd keyname 9.2 history
- 1
- // 顺序
- >zrange keyname 0 -1
- 1) history
- 2) math
- // 逆序
- >zrevrange keyname 0 -1
- 1) math
- 2) history
- // 相当于 count()
- >zcard keyname
- 2
获取指定 key 的 score
- >zscore keyname math
- 9
然后多个线程的环境怎么保证任务不被多个线程抢了? 这里可以使用 Redis 的 zrem 命令来实现
Redis Zrem 命令用于移除有序集中的一个或多个成员, 不存在的成员将被忽略.
当 key 存在但不是有序集类型时, 返回一个错误.
注意: 在 Redis 2.4 版本以前, ZREM 每次只能删除一个元素.
下面给出来自《Redis 深度历险: 核心原理与应用实践》小册的例子: 例子就是用有序集合和 zrem 来实现的
- import java.lang.reflect.Type;
- import java.util.Set;
- import java.util.UUID;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.TypeReference;
- import Redis.clients.jedis.Jedis;
- public class RedisDelayingQueue<T> {
- static class TaskItem<T> {
- public String id;
- public T msg;
- }
- // fastjson 序列化对象中存在 generic 类型时, 需要使用 TypeReference
- private Type TaskType = new TypeReference<TaskItem<T>>() {
- }.getType();
- private Jedis jedis;
- private String queueKey;
- public RedisDelayingQueue(Jedis jedis, String queueKey) {
- this.jedis = jedis;
- this.queueKey = queueKey;
- }
- public void delay(T msg) {
- TaskItem<T> task = new TaskItem<T>();
- task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
- task.msg = msg;
- String s = JSON.toJSONString(task); // fastjson 序列化
- jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
- }
- public void loop() {
- while (!Thread.interrupted()) {
- // 只取一条
- Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
- if (values.isEmpty()) {
- try {
- Thread.sleep(500); // 歇会继续
- } catch (InterruptedException e) {
- break;
- }
- continue;
- }
- String s = values.iterator().next();
- if (jedis.zrem(queueKey, s)> 0) { // 抢到了
- TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
- this.handleMsg(task.msg);
- }
- }
- }
- public void handleMsg(T msg) {
- System.out.println(msg);
- }
- public static void main(String[] args) {
- Jedis jedis = new Jedis();
- RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
- Thread producer = new Thread() {
- public void run() {
- for (int i = 0; i < 10; i++) {
- queue.delay("codehole" + i);
- }
- }
- };
- Thread consumer = new Thread() {
- public void run() {
- queue.loop();
- }
- };
- producer.start();
- consumer.start();
- try {
- producer.join();
- Thread.sleep(6000);
- consumer.interrupt();
- consumer.join();
- } catch (InterruptedException e) {
- }
- }
- }
不过在多线程环境, 是很难做控制的, 上面例子也有缺陷, 下面引用小册的说法:
上面的算法中同一个任务可能会被多个进程取到之后再使用 zrem 进行争抢, 那些没抢到的进程都是白取了一次任务, 这是浪费. 可以考虑使用 lua scripting 来优化一下这个逻辑, 将 zrangebyscore 和 zrem 一同挪到服务器端进行原子化操作, 这样多个进程之间争抢任务时就不会出现这种浪费了.
来源: http://www.jianshu.com/p/8e82cdcc2863