空间宠物业务需要实现一个定时消息触发组件,如在特定时刻给用户推送收集糖果通知、biubiu 球功能定时回收用户丢弃的球等。可见,消息只有在特定时间到达才能被处理。同时,消息的产生是无序的,即后产生的消息被处理的时间可能早于先产生的消息。
一些著名的消息队列组件,如 ActiveMQ ,本身支持消息延迟投递,为何本文选 择 Redis 呢?一方面是引入新组建有学习、运维、接入成本,而组内已积累一定 Redis 开发运维经验;另一方面则是基于 Redis 实现这样一个组件难度也不大。所以决定采用 Redis。
键空间通知可以在消息到达时插入一个 key,并给 key 设置过期时间,键过期后 会通过特定频道发布键过期通知,订阅方可收到通知并处理事件。但问题在于:
ZSET 可在消息插入时根据 score 排序,从而使最早的消息排在最前面。但 ZSET 没有提供 POP 方法,取得第一个元素和删除需要执行两个命令。为保证原子性,可以采用 事务,如:
- 127.0.0.1:6379> MULTI
- OK
- 127.0.0.1:6379> ZRANGE myzset 0 0 WITHSCORES
- QUEUED
- 127.0.0.1:6379> ZREMRANGEBYRANK myzset 0 0
- QUEUED
- 127.0.0.1:6379> EXEC
- 1) 1) "b"
- 2) "2"
- 2) (integer) 1
或者使用 pipelining,如:
- $ (printf "ZRANGE myzset 0 0 WITHSCORES\r\nZREMRANGEBYRANK myzset 0 0\r\n"; sleep 1) | nc localhost 6379
- *2
- $1
- c
- $1
- 3
- :1
但问题在于,虽然可顺序取出消息,但无法只在时间到达后取出消息。因此需要 client 端实现逻辑等待时间到达再推送。同时,消息产生是无序的,如果取得了一个 10 分钟后处理的消息,在此期间又产生了一个需要在 5 分钟后处理的消息,逻辑将变得复杂。
由于使用原生 Redis 无法满足需求,我们决定扩展 Redis 命令。
LUA 脚本是利用 3.X 版官方特性实现命令扩展的途径。以下脚本 将读出首元素,并与当前时间戳(以参数传入)比较,如果消息处理时间到达则删除消息并返回;所有操作将是原子的。目前我们线上服务使用该方案。
LUA 脚本:
- local rs = redis.call('ZRANGE', KEYS[1], '0', '0', 'WITHSCORES');
- if table.getn(rs)<2 then return rs end;
- if tonumber(rs[2]) < tonumber(ARGV[1]) then
- redis.call('ZREMRANGEBYRANK', KEYS[1], 0, 0);
- return rs
- end;
- return {}
client 生成命令:
- redisFormatCommand( & pCmd, "eval %s 1 %s %lld", szScript, szKey, (int64_t) time(NULL)));
缺点是:
改源码,加一个命令。我们较早上线的一个服务使用了该方案。
- /* 需要在server.c中加入实现的命令:
- struct redisCommand redisCommandTable[] = {
- //......
- {"zlpopif",zlpopifCommand,3,"w",0,NULL,1,1,1,0,0},
- {"zrpopif",zrpopifCommand,3,"w",0,NULL,1,1,1,0,0},
- };
- */
- /* 实现在t_zset.c: */
- void zpopGenericCommand(client * c, int reverse, int condition) {
- robj * key = c - >argv[1];
- robj * zobj;
- int keyremoved = 0;
- unsigned long deleted = 0;
- long start = 0,
- end = 0,
- llen = 0;
- /* for deletion */
- unsigned char * zleptr,
- *zlsptr;
- /* for addReply */
- unsigned char zlvstr[128];
- unsigned int zlvlen = 0;
- long long zlvlong = 0;
- robj * slele;
- double node_score;
- /* Step 1: Lookup & range sanity checks if needed. */
- if ((zobj = lookupKeyWriteOrReply(c, key, shared.czero)) == NULL || checkType(c, zobj, OBJ_ZSET)) return;
- llen = zsetLength(zobj);
- if (end >= llen) end = llen - 1;
- if (start > end || start >= llen) {
- return;
- }
- /* Step 2: Get value of the node will be remove */
- if (zobj - >encoding == OBJ_ENCODING_ZIPLIST) {
- unsigned char * zl = zobj - >ptr;
- unsigned char * vstr;
- if (reverse) zleptr = ziplistIndex(zl, -2 - (2 * start));
- else zleptr = ziplistIndex(zl, 2 * start);
- serverAssertWithInfo(c, zobj, zleptr != NULL);
- zlsptr = ziplistNext(zl, zleptr);
- serverAssertWithInfo(c, zobj, zleptr != NULL && zlsptr != NULL);
- serverAssertWithInfo(c, zobj, ziplistGet(zleptr, &vstr, &zlvlen, &zlvlong));
- /* copy the result, sice the node will be delete before addReply */
- node_score = zzlGetScore(zlsptr);
- if (vstr) strncpy((char * ) zlvstr, (char * ) vstr, zlvlen);
- } else if (zobj - >encoding == OBJ_ENCODING_SKIPLIST) {
- zset * zs = zobj - >ptr;
- zskiplist * zsl = zs - >zsl;
- zskiplistNode * ln;
- /* Check if starting point is trivial, before doing log(N) lookup. */
- if (reverse) {
- ln = zsl - >tail;
- } else {
- ln = zsl - >header - >level[0].forward;
- }
- serverAssertWithInfo(c, zobj, ln != NULL);
- slele = ln - >obj;
- incrRefCount(slele);
- /* MUST call decrRefCount to free mem */
- node_score = ln - >score;
- } else {
- serverPanic("Unknown sorted set encoding");
- }
- /* Step 3: Check if condition satisfied. */
- if (condition) {
- double condscore = 0;
- if (getDoubleFromObjectOrReply(c, c - >argv[2], &condscore, NULL) != C_OK) goto cleanup;
- if (!reverse && condscore < node_score) {
- addReply(c, shared.emptymultibulk);
- goto cleanup;
- }
- if (reverse && condscore > node_score) {
- addReply(c, shared.emptymultibulk);
- goto cleanup;
- }
- }
- /* Step 4: Perform the deletion operation. */
- if (zobj - >encoding == OBJ_ENCODING_ZIPLIST) {
- /* delete by ptr */
- serverAssertWithInfo(c, zobj, zleptr != NULL);
- zobj - >ptr = zzlDelete(zobj - >ptr, zleptr);
- deleted = 1;
- /* delete by range */
- /*zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);*/
- if (zzlLength(zobj - >ptr) == 0) {
- dbDelete(c - >db, key);
- keyremoved = 1;
- }
- } else if (zobj - >encoding == OBJ_ENCODING_SKIPLIST) {
- zset * zs = zobj - >ptr;
- /* delete by ptr */
- /*serverAssertWithInfo(c,zobj,slele != NULL);
- serverAssertWithInfo(c,zobj,zslDelete(zs->zsl,node_score,slele));
- dictDelete(zs->dict,slele);
- deleted = 1;*/
- /* delete by range */
- deleted = zslDeleteRangeByRank(zs - >zsl, start + 1, end + 1, zs - >dict);
- if (htNeedsResize(zs - >dict)) dictResize(zs - >dict);
- if (dictSize(zs - >dict) == 0) {
- dbDelete(c - >db, key);
- keyremoved = 1;
- }
- } else {
- serverPanic("Unknown sorted set encoding");
- }
- /* Step 5: Notifications and reply. */
- if (deleted) {
- signalModifiedKey(c - >db, key);
- notifyKeyspaceEvent(NOTIFY_ZSET, "zrem", key, c - >db - >id);
- /* we reuse the built in "zrem" keyspace event for pop operation! */
- if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c - >db - >id);
- server.dirty += deleted;
- }
- /* Step 6: Return the result in form of a multi-bulk reply */
- if (deleted) {
- addReplyMultiBulkLen(c, 2);
- /* at most one element with score */
- if (zobj - >encoding == OBJ_ENCODING_ZIPLIST) {
- if (zlvlen == 0) addReplyBulkLongLong(c, zlvlong);
- else addReplyBulkCBuffer(c, zlvstr, zlvlen);
- addReplyDouble(c, node_score);
- } else if (zobj - >encoding == OBJ_ENCODING_SKIPLIST) {
- addReplyBulk(c, slele);
- addReplyDouble(c, node_score);
- }
- } else {
- addReply(c, shared.emptymultibulk);
- }
- cleanup: ;
- if (zobj - >encoding == OBJ_ENCODING_SKIPLIST) decrRefCount(slele);
- }
- void zlpopifCommand(client * c) {
- zpopGenericCommand(c, 0, 1);
- }
- void zrpopifCommand(client * c) {
- zpopGenericCommand(c, 1, 1);
- }
缺点是:后续官方更新都需要改代码。
使用 [Redis 4.0 模块实现。此处是 GitHub 传送门。
相比前两种方法,此方法逻辑收归在服务端,且不需要修改 Redis 源码便于升级。但需要注意资源释放、复制机制等细节,谨防踩坑。
1 . 兼容性:要求所有从机、或加载 AOF/RDB 的实例均实现了新的命令,即均为修改版 Redis 或均加载了扩展模块。
2 . 命令写入 AOF 和从机的时机:
3 . 消息处理失败处理:ZSET 中消息被 pop 后才被 client 取得处理,若 client 处理失败则需要 client 在保证幂等的前提下自行重试。
来源: https://cloud.tencent.com/developer/article/1005722?fromSource=gwzcw.705991.705991.705991