前言
本篇会继续讲解 Sikiro.SMS.Job 服务的实现, 在我写第一篇的时候, 我就发现我当时设计的架构里 Sikiro.SMS.Job 这个可以选择不需要, 而使用 MQ 代替. 但是为了说明调度任务使用实现也坚持写了下. 后面会一篇针对架构, 实现优化的讲解.
源码地址: https://github.com/SkyChenSky/Sikiro.SMS
Quartz 的简介
Quartz.NET 是一款功能齐全的开源作业调度框架, 小至的应用程序, 大到企业系统都可以适用. Quartz 是作者 James House 用 JAVA 语言编写的, 而 Quartz.NET 是从 Quartz 移植过来的 C# 版本.
Quartz.Net 的作用
Quartz.Net 是多线程的, 允许多个 JOB 同时执行.
Quartz.Net 可以进行持久化, 结合管理后台可以进行可视化的监控
Quartz.Net 提供 API 进行远程操控, 结合管理后台可以进行运维管理
在一般企业, 可以利用 Quartz.Net 框架做各种的定时任务, 例如, 数据迁移, 跑报表等等.
Cron 表达式
字段名 | 是否必填 | 值范围 | 特殊字符 |
---|---|---|---|
Seconds | YES | 0-59 | , - * / |
Minutes | YES | 0-59 | , - * / |
Hours | YES | 0-23 | , - * / |
Day of month | YES | 1-31 | , - * ? / L W |
Month | YES | 1-12 or JAN-DEC | , - * / |
Day of week | YES | 1-7 or SUN-SAT | , - * ? / L # |
Year | NO | empty, 1970-2099 | , - * / |
缺点
Quartz.Net 的缺点很明显, 没有自带的管理后台, 而同款的 Hangfir 调度任务框架则会有更加良好的易用性. 但是在 Github 上有不少人开源了 Quartz.Net 的管理后台, 对此作为了弥补.
其他
其他 Quartz.Net 的信息可以看我之前记录的一篇文章《Quartz.NET 的使用(附源码)》
Quartz.Net DEMO:https://github.com/SkyChenSky/QuartzDotNetDemo.git
业务流程
从 MongoDB 持久化的数据, 查询出状态为待处理并且定时时间小于当前时间的数据. 通过 Mongo 驱动提供的 FindOneAndUpdate 对文档进行原子性操作(更新中间状态并查询出刚更新的文档). 如果有数据则发送到 MQ, 由 Sikiro.SMS.Bus 进行订阅发送, 因为本次有数据, 我认为可能还会有其他需要发送的数据, 因此立刻调用 JOB 自身方法, 进行下一条需要处理的数据进行发送. 如果此次 JOB 的执行并没有数据, 那么认为接下来一段时间没有需要处理的数据, 这次调度结束.
TimeSendSms 示例
- public class TimeSendSms : BaseJob
- {
- private readonly SmsService _smsService;
- private readonly IBus _bus;
- public TimeSendSms(SmsService smsService, IBus bus)
- {
- _smsService = smsService;
- _bus = bus;
- }
- protected override void ExecuteBusiness()
- {
- _smsService.GetToBeSend();
- if (_smsService.Sms != null)
- _bus.Publish(_smsService.Sms.MapTo<SmsModel, SmsQueueModel>());
- _smsService.ContinueDo(ExecuteBusiness);
- }
- protected override void OnException()
- {
- _smsService.RollBack();
- }
- }
模板模式
Job 的轮询处理流程基本相似, 查询出需要执行数据 - 遍历业务处理 - 如果有异常则特殊处理, 因此针对类似流程相同, 但是实现有差异的程序, 我们可以使用模板模式.
- public abstract class BaseJob : IJob
- {
- private void OnException(Action action)
- {
- try
- {
- action();
- }
- catch (Exception e)
- {
- e.WriteToFile();
- OnException();
- }
- }
- public Task Execute(IJobExecutionContext context)
- {
- OnException(ExecuteBusiness);
- return null;
- }
- protected virtual void OnException()
- {
- }
- protected abstract void ExecuteBusiness();
- }
Mongo 的原子性
原子性
原子是物理概念, 指的是指化学反应不可再分的基本微粒. 而计算机领域的原子性强调的对象是操作(指令, 事务). 我们所说的指令组是原子操作, 意思要么一起成功, 要么一起失败. 不允许 2 个指令里, 一个成功一个失败的情况存在.
MongoDB 原子操作
MongoDB 的原子操作就是要么这个文档完整的保存到 Mongodb, 要么没有保存到 Mongodb, 不会出现查询到的文档没有保存完整的情况.
MongoDB 的文档的保存, 修改, 删除等操作都是原子性, 除此之外还提供了 FindOneAndDelete,FindOneAndUpdate,FindOneAndReplace 等原子操作.
以 FindOneAndUpdate 为例, 对某文档 FindOneAndUpdate, 可以文档 B 进行 Update 操作完成后返回出文档 B 的结果, 根据参数返回结果是更新前还是更新后(一般我们需要更新后).
而这 FindOneAndUpdate 的操作对于我们更新到中间状态的非常实用:
避免进行 Update 后无法良好的查询到刚 Update 的文档
避免应用集群部署时批量更新后, 无法良好分配任务
批量更新多个文档需要 isolated 标识隔离, 全局锁在大并发情况下性能并不乐观
虽然以上可以通过更新时标识版本号进行解决, 这无疑增加实现难度.
MongoDB 锁机制
Mongodb 并发操作又读写锁来进行控制.
简单来说
当进行读操作的时候会加读锁, 这个时候其他读操作可以也获得读锁, 但是不能加写锁, 也就是说不能进行写操作.
当进行写操作的时候会加写锁, 这个时候其他操作无法加任何锁, 也就是说不能进行其他的读操作和写操作.
多个 JOB 的并发性
综上所述, 落实到我们应用场景, 在部署多个调度任务服务, 或者 JOB 多个线程去跑时, 我们可以使用 FindOneAndUpdate, 每个调度任务每次只处理一个文档, Update 操作的时候会进行写锁阻塞其他进程 (进程) 的写操作. 那么就可以保证每个调度任务都可以只处理唯一一个有效的文档, 避免重复处理.
下面是我的 Sikiro.Nosql.Mongo 的 FindOneAndUpdate 封装示例, 因为 Update 字段的不友好, 所以我封装了一下 Lambda 表达式, ReturnDocument =ReturnDocument.After 标识响应数据是更新前还是更新后的文档.
- public T GetAndUpdate<T>(string database, string collection, Expression<Func<T, bool>> predicate, Expression<Func<T, T>> updateExpression)
- {
- var db = _mongoClient.GetDatabase(database);
- var col = db.GetCollection<T>(collection);
- var updateDefinitionList = MongoExpression<T>.GetUpdateDefinition(updateExpression);
- var updateDefinitionBuilder = new UpdateDefinitionBuilder<T>().Combine(updateDefinitionList);
- return col.FindOneAndUpdate(predicate, updateDefinitionBuilder, new FindOneAndUpdateOptions<T, T>
- {
- ReturnDocument = ReturnDocument.After
- });
SQL Server 的 UpdateSelect
SQL Server 的操作也具有上述 FindOneAndUpdate 的功能, 我们公司成他为 UpdateSelect, 下面是示例代码:
- UPDATE TOP ( 100 )
- SYS_USER WITH ( UPDLOCK, READPAST )
- SET USER_STATUS = 1
- OUTPUT INSERTED.[USER_NAME] ,
- INSERTED.SYS_USERID ,
- INSERTED.EMAIL
- FROM SYS_USER
- WHERE CREATE_DATETIME < '2018-09-13'
- AND USER_STATUS = 2;
结尾
本篇介绍了调度任务结合 MongoDB 原子操作的使用, 使得调度任务服务可以具有良好的伸缩性. 如果有任何建议与问题可以在下方评论反馈给我.
来源: https://www.cnblogs.com/skychen1218/p/9621844.html