1. 使用场景
在很多系统中,特别是电商系统常常存在需要执行延迟任务.例如一个待支付订单,需要在 30 分钟后自动关闭.虽然有很多方式可以实现,比如说 Job 等,这里主要介绍利用 Redis 的新特性 keyspace notifications 来实现.
2. 基础知识
重点!!! Redis 2.8.0 版本开始支持 keyspace notifications.如果你的 Redis 版本太低,可以洗洗睡了……
如果你还不了解 Redis 的 Pub/Sub,强烈建议你先阅读该篇文章: Redis 发布与订阅
接下来说说我们的主角:keyspace notifications
keyspace notifications 默认是关闭状态,开启则需要修改 redis.conf 文件或通过 CONFIG SET 来开启或关闭该功能.这里我们使用 CONFIG SET 来开启:
$ redis - cli config set notify - keyspace - events Ex
这里有人会问了, Ex 是什么意思呢?这是 notify-keyspace-events 的参数,完整的参数列表看看下面的表格:
字符 发送的通知
键空间通知,所有通知以 为前缀
键事件通知,所有通知以 为前缀
, , 等类型无关的通用命令的通知
字符串命令的通知
列表命令的通知
集合命令的通知
哈希命令的通知
有序集合命令的通知
过期事件:每当有过期键被删除时发送
驱逐 (evict) 事件:每当有键因为 政策而被删除时发送
参数 的别名
可以看出,我们只开启了键事件通知和过期事件.因为我们实现延时任务只需要这两个就足够了.话不多说,直接看代码.
3. 实现方案
一个延迟任务应该具备哪些属性? 我觉得至少有以下属性:
任务类型.(例如:关闭订单)
任务 ID.(例如:订单 ID)
任务延迟时间.(例如:30 分钟)
任务额外数据.(例如:订单其他相关数据)
确定好后,我们可以继续往下走.
3.1 注册事件处理器
首先在工程启动后,我们需要根据不同的事件注册不同的处理器:
3.2 创建延迟任务
const _ = require('lodash')
// 任务处理器map
const handlers = {}
// 事件类型map
const events = {}
const registerEventHandler = (type, handler) = >{
if (!type) {
throw new Error('type不能为空')
}
if (!_.isFunction(handler)) {
throw new Error('handler类型非function')
}
handlers[type] = handler events[type] = true
}
这里比较关键的点就是 client.setex(key, expired, value) 这个方法,我们需要给 key 添加一个过期时间,那么当 key 过期后 redis 才会发出一个过期事件.
const redis = require('redis')
const client = redis.createClient()
const eventKeyPrefix = 'custom_event_'// 任务列表
const jobs = {}
const addDelayEvent = (type, id, body = {}, delay = 10 * 60) => {
const key = `${eventKeyPrefix}${type}_${id}`
const jobKey = `${type}_${id}`
client.setex(key, delay, 'delay event', (err) => {
if (err) {
return console.log('添加延迟事件失败:', err);
}
console.log('添加延迟事件成功');
jobs[jobKey] = body
})
}
3.3 订阅过期事件
实现了前两个步骤后,我们已经可以往 redis 里写入带有过期时间的 key 了.接下来关键的就是订阅过期事件并处理.
在很多系统中,特别是电商系统常常存在需要执行延迟任务.例如一个待支付订单,需要在30分钟后自动关闭.虽然有很多方式可以实现,比如说Job等,这里主要介绍利用Redis的新特性 keyspace notifications来实现.
const redis = require('redis')
const sub = redis.createClient()
sub.on('pmessage', (pattern, channel, message) => {
// match key
const keyMatcher = new RegExp(`^${eventKeyPrefix}(${_.keys(events).join('|')})_(\\S+)
'lodash'
'type不能为空'Redis 2.8.0版本开始支持 keyspace notifications.如果你的Redis版本太低,可以洗洗睡了……
如果你还不了解Redis的Pub/Sub,强烈建议你先阅读该篇文章: 'handler类型非function'
接下来说说我们的主角:keyspace notifications
keyspace notifications默认是关闭状态,开启则需要修改redis.conf文件或通过CONFIG SET来开启或关闭该功能.这里我们使用CONFIG SET来开启:
'redis'
这里有人会问了, Ex 是什么意思呢?这是notify-keyspace-events的参数,完整的参数列表看看下面的表格:
'delay event'
可以看出,我们只开启了键事件通知和过期事件.因为我们实现延时任务只需要这两个就足够了.话不多说,直接看代码.
'delay event'
一个延迟任务应该具备哪些属性? 我觉得至少有以下属性:
'delay event'
确定好后,我们可以继续往下走.
'delay event'
首先在工程启动后,我们需要根据不同的事件注册不同的处理器:
'添加延迟事件失败:''添加延迟事件失败:''添加延迟事件失败:'
这里比较关键的点就是client.setex(key, expired, value)这个方法,我们需要给key添加一个过期时间,那么当key过期后redis才会发出一个过期事件.
'添加延迟事件失败:'
实现了前两个步骤后,我们已经可以往redis里写入带有过期时间的key了.接下来关键的就是订阅过期事件并处理.
3.4 编写 Demo
) const result = message.match(keyMatcher) if (result) {
const type = result[1];
const id = result[2];
const handler = handlers[type] console.log('订阅消息:type=%s, id=%s', type, id);
if (_.isFunction(handler)) {
const jobKey = `$ {
type
}
_$ {
id
}`
if (jobs[jobKey]) {
handler(id, jobs[jobKey])
} else {
console.log('未找到延迟事件,type=%s,id=%s', type, id);
}
} else {
console.log('未找到事件处理器.type=%s', type)
}
}
})
// 订阅频道
sub.psubscribe('__key*__:expired')
最后我们写一个 Demo 来验证下我们的功能.
const eventManager = require('./utils/eventManager') eventManager.registerEventHandler('closeorder', (id, body) = >{
console.log('关闭订单 id=%s, body=%o', id, body);
}) eventManager.addDelayEvent('closeorder', 1111, {
name: 'test',
5)
Done!
来源: https://juejin.im/post/5a5ae6f7f265da3e5a5738f6