一, 前言
在我们的项目当中, 使用定时任务是避免不了的, 我们在部署定时任务时, 通常只部署一台机器. 部署多台机器时, 同一个任务会执行多次. 比如短信提醒, 每天定时的给用户下发短信, 如果部署了多台, 同一个用户将发送多条. 只部署一台机器, 可用性又无法保证. 今天向大家介绍一款开源产品, 分布式定时任务解决方案 ---- elastic-job.
二, 简介
Elastic-Job 是一个分布式调度解决方案, 由两个相互独立的子项目 Elastic-Job-Lite 和 Elastic-Job-Cloud 组成. 在我们的项目中使用了轻量级无中心化解决方案, Elastic-Job-Lite.
1, 分片概念
任务的分布式执行, 需要将一个任务拆分为多个独立的任务项, 然后由分布式的服务器分别执行某一个或几个分片项.
例如: 有一个遍历数据库某张表的作业, 现有 2 台服务器. 为了快速的执行作业, 那么每台服务器应执行作业的 50%. 为满足此需求, 可将作业分成 2 片, 每台服务器执行 1 片. 作业遍历数据的逻辑应为: 服务器 A 遍历 ID 以奇数结尾的数据; 服务器 B 遍历 ID 以偶数结尾的数据. 如果分成 10 片, 则作业遍历数据的逻辑应为: 每片分到的分片项应为 ID%10, 而服务器 A 被分配到分片项 0,1,2,3,4; 服务器 B 被分配到分片项 5,6,7,8,9, 直接的结果就是服务器 A 遍历 ID 以 0-4 结尾的数据; 服务器 B 遍历 ID 以 5-9 结尾的数据.
Elastic-Job 并不直接提供数据处理的功能, 框架只会将分片项分配至各个运行中的作业服务器, 开发者需要自行处理分片项与真实数据的对应关系.
2, 作业高可用
上述作业中, 如果有一个应用挂掉, 分片项将会重新分片, 没有挂掉的应用将获得分片项 0-9.
三, 实际应用
这里我们采用大家都比较熟悉的基于 spring 配置文件的配置.
1, 引入 jar 包
- <!-- 引入 elastic-job-lite 核心模块 -->
- <dependency>
- <groupId>com.dangdang</groupId>
- <artifactId>elastic-job-lite-core</artifactId>
- <version>${latest.release.version}</version>
- </dependency>
- <!-- 使用 springframework 自定义命名空间时引入 -->
- <dependency>
- <groupId>com.dangdang</groupId>
- <artifactId>elastic-job-lite-spring</artifactId>
- <version>${latest.release.version}</version>
- </dependency>
2, 作业程序
- public class MyElasticJob implements SimpleJob {
- @Override
- public void execute(ShardingContext context) {
- switch (context.getShardingItem()) {
- case 0:
- // do something by sharding item 0
- break;
- case 1:
- // do something by sharding item 1
- break;
- case 2:
- // do something by sharding item 2
- break;
- // case n: ...
- }
- }
- }
我们的定时任务要实现 SimpleJob 接口, 并实现 execute 方法. 在写程序时, 我们通常不会用 case 区分不同的分片, context.getShardingItem() 可以获得当前的分片项, context.getShardingTotalCount() 获得总分片数. 我们把当前分片项, 总分片数传入到 sql 中, 按照规则字段取模, 检索出该分片处理的数据, 再进行处理.
3,spring 配置
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
- xmlns:job="http://www.dangdang.com/schema/ddframe/job"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.dangdang.com/schema/ddframe/reg
- http://www.dangdang.com/schema/ddframe/reg/reg.xsd
- http://www.dangdang.com/schema/ddframe/job
- http://www.dangdang.com/schema/ddframe/job/job.xsd
- ">
- <!-- 配置作业注册中心 -->
- <reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
- <!-- 配置作业 -->
- <job:simple id="oneOffElasticJob" overwrite="true" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> </beans>
作业中心我们采用 zookeeper, 我们项目中采用做小的 zk 集群, 3 台. 在作业中心配置中, server-lists 填写 3 台 zk 地址, 用 "," 隔开, zk1:port1,zk2:port2,zk3:port3. 下面就是我们作业的具体实现的配置规则, class 实现类, registry-center-ref 配置中心 zk 的 id(regCenter),cron 定时任务规则, sharding-total-count 总分片数.
overwrite="true" 这个配置很重要, 因为这些配置都要上传到 zk 中, 当你改变了配置之后, zk 中并没有改变, 执行的任务还是旧的. 所以要加上这个配置.
这样, 我们的分布式定时任务就配置好了, 剩下的就是部署, 上面的例子中, 我们的总分片数是 4, 如果我们部署 2 台机器, 每台机器将获得 2 个分片, 部署 4 台机器, 每台机器获得一个分片. 如果出现宕机情况, 分片将重新分配, 从而做到高可用.
四, 总结
当当的这款开源产品是非常棒的, 解决了我的项目中定时任务的单点问题, 使系统有了高可用的保证. 要说缺点嘛, 也有一个, 就是每一个任务都需要新写一个类实 SimpleJob 接口.
来源: http://www.jianshu.com/p/4118348f01b6