微服务的概念可以说给程序设计打开了一个新世界, 带来了众多的优点, 但是也将一些以往容易处理的问题变得复杂, 例如: 缓存, 事务, 定时任务等. 缓存可以用中间件例如 Redis,Memcached 等, 事务有诸多分布式事务框架解决, 定时任务也有分布式的解决方案, 例如 quartz,elastic job 等, 今天我要讲的是就是定时任务.
既然已经有成熟的分布式定时任务框架, 我要讲的东西并不是用另一种设计去实现相同的功能, 而是从不同的角度去解决分布式定时任务的问题.
问题来源
这个问题来起源于一个小功能, 我们有一个发送短信的微服务, 需要获取短信的状态报告, 状态报告对于短信发送不是同步的, 短信提交到服务商, 服务商要提交运营商发送之后才能生成状态报告, 因此有一定的延迟, 需要异步获取, 并且服务商提供的接口有频率限制, 因此需要做一个定时任务, 且需要单点执行, 那么问题来了, 因为这一个功能我就需要引入一个定时任务框架吗, 总感觉有点大材小用的意思.
之前我们的定时任务处理既有用过 quartz, 也用过 elastic job, 但是只为这样一个小功能就引入一个框架, 再加上配置又得好半天, 想想都不划算.
例如要用 quartz, 要创建一堆数据库表, 但表里面只存储了一个任务信息.
用 elastic job 吧, 还要使用 zookeeper, 即便用 lite 版, 也需要一堆配置, 远比我写业务的时间要长.
我只想简简单单的写逻辑!!!
解决方案
谈分布式解决方案大致总离不开中间件, 联想到上次解决 websocket 的分布式方案 (参见 Spring Cloud 微服务架构下的 WebSocket 解决方案) 使用到的 Spring Cloud Stream, 大概有了思路:
我需要一个任务分发中心, 专门负责触发定时任务
其他服务如果需要触发定时任务, 接收特定的触发消息
任务执行完成向任务分发中心推送任务完成的确认消息
为任务执行端提供一个公共的 spring boot starter 晚上 2,3 的步骤, 实际需要编码的几乎就剩下业务逻辑本身了
详细设计
根据上一步的方案, 需要确认一些细节, 以及一些特殊的情况, 例如定时任务可能是由微服务集群中单个实例执行, 也可能存在集体执行(例如更新内存中的缓存), 还可能存在分区执行.
客户端 (需要定时任务的为服务端) 需要建立以下消息队列:
集群接收的队列, 每个微服务实例建立一个, 每个微服务实例都会收到相同消息
单独接收的队列, 每个应用集群建立一个, 确保消息只被一个实例消费
按分区接收的队列, 每个分区建立一个, 确保只被分区内一个实例消费
客户端与服务端需要通过唯一的任务 id 来确认需要执行的定时任务
服务端 (任务分发微服务) 需要根据情况将消息推送到不同的队列, 不能直接使用 Spring Cloud Stream, 需要使用 rabbitmq
服务端本身也是分布式的, 因此需要一个定时任务框架用于任务触发, 我这里选择了 quartz
代码实现
Spring Cloud Stream 的基本知识我不再复述了, Spring Cloud 微服务架构下的 WebSocket 解决方案中有讲解.
定时任务分发服务
定义定时任务
- data class ScheduleTask(
- /** 任务的 id, 全局唯一, 与客户端的 taskId 完全匹配 */
- var taskId: String = "",
- /** 定时任务的 cron 表达式 */
- var cron: String = "",
- /** 关联应用 */
- var appId: Int = 0,
- /** 任务描述 */
- var description: String = "",
- /** 接收任务的分区 */
- var zone: String? = null,
- /** 调度方式, 广播到集群或单例执行, 默认单例 */
- var dispatchMode: DispatchMode = DispatchMode.Singleton,
- /** 是否启用 */
- var enabled: Boolean = true,
- /** 任务的数据库记录 id, 自增 */
- var id: Int = -1)
任务调度
使用 quartz 进行任务调度
- private fun scheduleJob(task: ScheduleTask) {
- val job = JobBuilder.newJob(TaskEmitterJob::class.java)
- .withIdentity(task.taskId, task.appId.toString())
- .withDescription(task.description)
- .storeDurably()
- .requestRecovery()
- .usingJobData("id", task.id)
- .usingJobData("taskId", task.taskId)
- .build()
- val trigger = TriggerBuilder.newTrigger()
- .withIdentity(task.taskId, task.appId.toString())
- .withSchedule(CronScheduleBuilder.cronSchedule(task.cron))
- .forJob(job)
- .build()
- scheduler.addJob(job, true, true)
- if (scheduler.checkExists(trigger.key)) {
- scheduler.rescheduleJob(trigger.key, trigger)
- } else {
- scheduler.scheduleJob(trigger)
- }
- }
ScheduleTask 是持久化的, 插入的时候同时向 quartz 插入任务, 更新的时候也要向 quartz 更新, 删除的时候同时删除
quartz 的任务触发
- class TaskEmitterJob : Job {
- companion object {
- private val log = LogFactory.getLog(TaskEmitterJob::class.java)
- }
- override fun execute(context: JobExecutionContext) {
- try {
- val taskId = context.jobDetail.jobDataMap["taskId"] as String
- log.info("任务分发:$taskId")
- val service = ScheduleCenterApplication.context.getBean(ScheduleTaskService::class.java)
- service.launch(taskId)
- } catch (e: Exception) {
- log.error("任务失败 $[taskId]", e)
- }
- }
- }
rabbitmq 的发送逻辑
- /**
- * 发布定时任务事件
- */
- fun launch(task: ScheduleTask) {
- val exchange = when (task.dispatchMode) {
- Cluster -> "aegisScheduleCluster"
- Singleton -> "aegisScheduleSingleton"
- }
- val routingKey = when (task.dispatchMode) {
- Cluster -> exchange
- Singleton -> "$exchange.${task.appName}"
- }
- val executeTaskInfo = ScheduleTaskInfo(task.taskId, task.appName!!)
- amqpTemplate.convertAndSend(exchange, routingKey,
- executeTaskInfo)
- taskExecuteRecordDAO.save(
- TaskExecuteRecord(executeTaskInfo.uid, task.id, Date())
- )
- }
客户端 spring boot starter 的实现
定义定时任务接口, 只要在项目中实现该接口并将实现声明为 bean, 即可完成定时任务的定义
- @FunctionalInterface
- interface ScheduledJob {
- /**
- * 执行定时任务
- */
- fun execute(properties: Map<String, Any>)
- /**
- * 获取定时任务 id
- * @return 定时任务 id, 对应任务分发中心 ScheduleTask 的 taskId
- */
- fun getId(): String
- }
接收任务
- /**
- * 接收单例任务
- */
- @StreamListener(SINGLETON_INPUT)
- fun acceptGroupTask(taskInfo: ScheduleTaskInfo) {
- if (taskInfo.App == application) {
- val receivedTime = Date()
- val job = jobsProvider.ifAvailable?.firstOrNull {
- it.getId() == taskInfo.id
- }
- job?.execute(taskInfo.properties ?: mapOf())
- singletonOutput.send(GenericMessage(
- ConfirmInfo(taskInfo.id, taskInfo.uid, job != null, receivedTime, Date())
- ))
- }
- }
集群全体执行任务与单例任务的区别只在 stream 的配置, 一个需要声明 binding 的 group, 一个不需要, 这属于 Spring Cloud Stream 的知识范畴, 可以自己看官方文档或查看我前面提到的文档, 如果有不懂的可以私聊我.
stream 的事件流声明
- /**
- * 定时任务信息的事件流接口
- * @author 吴昊
- * @since 0.1.0
- */
- interface AegisScheduleClient {
- companion object {
- const val CLUSTER_INPUT = "aegisScheduleClusterInput"
- const val SINGLETON_INPUT = "aegisScheduleSingletonInput"
- const val CONFIRM_OUTPUT = "aegisScheduleGroupOutput"
- }
- /**
- *
- * @return
- */
- @Input(CLUSTER_INPUT)
- fun scheduleInput(): SubscribableChannel
- /**
- *
- * @return
- */
- @Input(SINGLETON_INPUT)
- fun singletonScheduleInput(): SubscribableChannel
- /**
- *
- * @return
- */
- @Output(CONFIRM_OUTPUT)
- fun confirmOutput(): MessageChannel
- }
最后再加上服务端确认消息的接收代码:
- @StreamListener(CONFIRM_INPUT)
- fun acceptGroupTask(confirmInfo: ConfirmInfo) {
- LOG.info("接收到确认消息:$confirmInfo")
- scheduleTaskService.confirm(confirmInfo)
- }
主要的代码已经全部放上来了, 整体思路也很简单, 后面仍有很多需要优化的地方, 例如消息推送失败, 或者确认消息未送达等等, 于整体设计并没有多大的影响了.
这样在微服务端如果需要添加定时任务, 只需要
引入 starter
实现 ScheduledJob 接口
在任务调度中心添加任务
至于在任务中心添加任务, 主题代码有了, 实现个简单管理界面很容易对不对, 也就几个字段的输入.
最后附上管理界面的截图:
任务列表
任务详情
我的其他文章:
Spring Cloud 微服务架构下的 WebSocket 解决方案
Mybatis 去 xml 化: 我再也不想写 xml 了
Spring Security OAuth2 缓存使用 jackson 序列化的处理
来源: https://juejin.im/post/5c24c3d86fb9a04a0c2e8180