背景
相信我们或多或少的会遇到类似下面这样的需求:
第三方给了一批数据给我们处理, 我们处理好之后就通知他们处理结果.
大概就是下面这个图说的.
本来在处理完数据之后, 我们就会马上把处理结果返回给对方, 但是对方要求我们处理速度不能过快, 要有一种人为处理的效果.
换句话就是说, 就算是处理好了, 也要晚一点再执行通知操作.
这就是一个典型的延时任务.
延时, 那还不简单, 执行完之后, 让它 Sleep 一下就好了, 这样就达到目标了.
Sleep 一下确定是最容易实现的一种方案, 但是试想一下, 数据的数量不断的增加, 这样 Sleep 真的好吗? 答案是否定的.
延时队列, 是处理这个场景最为妥当的方案.
RabbitMQ,RocketMQ,Cmq 等都可以直接或间接的达到相应的效果.
如果不具备队列条件, 又要怎么处理呢? 还可以借助 Redis 来完成这项工作.
MQ 不一定每个公司都会用, 但 Redis 应该 80% 以上的都会用吧.
处理方案
Redis 这边, 可用的方案有两种, 下面分别来介绍一下.
#1 键的过期时间
在设置缓存的时候, 我们比较多情况下都会设置一个缓存的过期时间, 这个时间过期后, 会重新去数据源拿数据回来.
可以基于这个过期时间结合 Redis 的 keyspace notifications 共同完成.
keyspace notifications 里面包含了非常多的事件, 这里只关注 EXPIRE, 这个是和过期有关的.
只要订阅了__keyevent@0__:expired 这个主题, 当有 key 过期的时候, 就会收到对应的信息.
注: 主题 @后面的 0, 指的是 db 0.
要想使用这个特性, 必不可少的一步是修改 Redis 默认的配置, 把 notify-keyspace-events 设置成 Ex.
- ############################# Event notification ##############################
- # Redis can notify Pub/Sub clients about events happening in the key space.
- # This feature is documented at http://redis.io/topics/notifications
- #
- # .........
- #
- # By default all notifications are disabled because most users don't need
- # this feature and the feature has some overhead. Note that if you don't
- # specify at least one of K or E, no events will be delivered.
- notify-keyspace-events "Ex"
其中 E 指的是键事件通知, x 指的是过期事件.
根据这个特性, 重新调整一下流程图:
应该也比较好懂, 下面通过简单的代码来实现一下这种方案.
首先是处理完数据及往 Redis 写数据.
- public async Task DoTaskAsync()
- {
- // 数据处理
- // ...
- // 后续操作要延时, 把 Id 记录下来
- var taskId = new Random().Next(1, 10000);
- // 要延迟的时间
- int sec = new Random().Next(1, 5);
- // 可以加个重试机制, 预防单次执行失败.
- await RedisHelper.SetAsync($"task:{taskId}", "1", sec);
- }
还需要回传结果的后台任务, 这个任务就是去订阅上面说的键过期事件, 然后回传结果.
这里可以借助 BackgroundService 来订阅处理.
- public class SubscribeTaskBgTask : BackgroundService
- {
- protected override Task ExecuteAsync(CancellationToken stoppingToken)
- {
- stoppingToken.ThrowIfCancellationRequested();
- var keyPrefix = "task:";
- RedisHelper.Subscribe(
- ("__keyevent@0__:expired", arg =>
- {
- var msg = arg.Body;
- Console.WriteLine($"recive {msg}");
- if (msg.StartsWith(keyPrefix))
- {
- // 取到任务 Id
- var val = msg.Substring(keyPrefix.Length);
- Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");
- // 回传处理结果给第三方, 这里可以考虑这个并发锁, 避免多实例都处理了这个任务.
- // ....
- }
- }
- ));
- return Task.CompletedTask;
- }
- }
这里有一个要注意的地方, 要在 key 里面包含任务的 Id, 因为订阅处理的时候, 只能拿到一个 key, 后续能做的操作也只是基于这个 key.
上面的例子, 是用了 task: 任务 Id 的形式, 所以在订阅处理的时候, 只处理以 task: 开头的那些 key.
效果如下:
这种方案, 直观上是非常简单的, 不过这种方案会遇到一个小问题.
当一个 key 过期后, 并不一定会马上收到通知, 这个也是会有一定的延时的, 取决于 Redis 的内部机制.
Redis Keyspace Notifications 文档的最后一段也提到了这个问题.
所以用这种方案的时候, 要考虑一下, 你的延时是不是要及时~~
#2 有序集合
有序集合是 Redis 中一种十分有用的数据结构, 它的本质其实就是集合加了一个排序的功能, 每个集合里面的元素还会有一个分值的属性.
它提供了一个可以获取指定分值范围内的元素, 这个也就是我们的出发点.
在这个场景下, 什么东西可能作为这个分值呢? 现在只有一个处理任务的 Id 还有一个延迟的时间, Id 肯定不行, 那么也只能是延迟时间来作这个分值了.
延迟 1 秒, 5 秒, 1 分钟, 这个都是比较大粒度的时间, 这里要转化一下, 用时间戳来代替这些延迟的时间.
假设现在的时间戳是 1584171520, 要延迟 5 秒执行, 那么执行任务的时间就是 1584171525, 在当前时间戳的基础上加个 5 秒, 就是最终要执行的了.
到时有序集合中存的元素就会是这样的
任务 Id-1 1584171525
任务 Id-2 1584171528
任务 Id-3 1584171530
接下来就是要怎么取出这些任务的问题了!
把当前时间戳当成是取数的最大分值, 0 作为最小分值, 这个时候取出的元素就是应该要执行回传的任务了.
根据这个方案, 重新调整一下流程图:
交代清楚了思路, 再来点代码, 加深一下理解.
首先还是处理完数据后往 Redis 写数据.
- public async Task DoTaskAsync()
- {
- // 数据处理
- // ...
- // 后续操作要延时, 把 Id 记录下来
- var taskId = new Random().Next(1, 10000);
- var cacheKey = "task:delay";
- int sec = new Random().Next(1, 5);
- // 要执行这个任务的时间戳
- var time = DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds();
- await RedisHelper.ZAddAsync(cacheKey, (time, taskId));
- Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} done {taskId} here - {sec}");
- }
后面就是轮训有序集合里面的元素了, 这里同样是借助 BackgroundService 来处理.
- public class SubscribeTaskBgTask : BackgroundService
- {
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- stoppingToken.ThrowIfCancellationRequested();
- var cacheKey = "task:delay";
- while (true)
- {
- // 先取, 后删, 不具备原子性, 可考虑用 lua 脚本来保证原子性.
- var vals = await RedisHelper.ZRangeByScoreAsync(cacheKey, -1, DateTimeOffset.Now.ToUnixTimeSeconds(), 1, 0);
- if (vals != null && vals.Length> 0)
- {
- var val = vals[0];
- var rmCount = await RedisHelper.ZRemAsync(cacheKey, vals);
- if (rmCount> 0)
- {
- // 要把这个元素先删除成功了, 再执行任务, 不然会重复
- Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");
- // 回传处理结果给第三方, 这里可以考虑这个并发锁, 避免多实例都处理了这个任务.
- // ....
- }
- }
- else
- {
- // 没有数据, 休眠 500ms, 避免 CPU 空转
- await Task.Delay(500);
- }
- }
- }
- }
效果如下:
参考文章
- https://redis.io/topics/notifications
- https://zhuanlan.zhihu.com/p/87113913
来源: https://www.cnblogs.com/catcher1994/p/12496319.html