LTS 用户文档
LTS(light-task-scheduler)主要用于解决分布式任务调度问题, 支持实时任务, 定时任务和 Cron 任务. 有较好的伸缩性, 扩展性, 健壮稳定性而被多家公司使用, 同时也希望开源爱好者一起贡献.
项目地址
GitHub 地址:
oschina 地址: http://git.oschina.net/hugui/light-task-scheduler
例子: https://github.com/ltsopensource/lts-examples
框架概况
LTS 有主要有以下四种节点:
JobClient: 主要负责提交任务, 并接收任务执行反馈结果.
JobTracker: 负责接收并分配任务, 任务调度.
TaskTracker: 负责执行任务, 执行完反馈给 JobTracker.
LTS-Admin:(管理后台)主要负责节点管理, 任务队列管理, 监控管理等.
其中 JobClient,JobTracker,TaskTracker 节点都是无状态的. 可以部署多个并动态的进行删减, 来实现负载均衡, 实现更大的负载量, 并且框架采用 FailStore 策略使 LTS 具有很好的容错能力.
LTS 注册中心提供多种实现 (Zookeeper,Redis 等), 注册中心进行节点信息暴露, master 选举.(Mongo or MySQL) 存储任务队列和任务执行日志, netty or mina 做底层通信, 并提供多种序列化方式 fastjson, hessian2, java 等.
LTS 支持任务类型:
实时任务: 提交了之后立即就要执行的任务.
定时任务: 在指定时间点执行的任务, 譬如 今天 3 点执行(单次).
Cron 任务: CronExpression, 和 quartz 类似 (但是不是使用 quartz 实现的) 譬如 0 0/1 * * * ?
支持动态修改任务参数, 任务执行时间等设置, 支持后台动态添加任务, 支持 Cron 任务暂停, 支持手动停止正在执行的任务(有条件), 支持任务的监控统计, 支持各个节点的任务执行监控, JVM 监控等等.
架构图
LTS architecture
概念说明
节点组
英文名称 NodeGroup, 一个节点组等同于一个小的集群, 同一个节点组中的各个节点是对等的, 等效的, 对外提供相同的服务.
每个节点组中都有一个 master 节点, 这个 master 节点是由 LTS 动态选出来的, 当一个 master 节点挂掉之后, LTS 会立马选出另外一个 master 节点, 框架提供 API 监听接口给用户.
FailStore
顾名思义, 这个主要是用于失败了存储的, 主要用于节点容错, 当远程数据交互失败之后, 存储在本地, 等待远程通信恢复的时候, 再将数据提交.
FailStore 主要用户 JobClient 的任务提交, TaskTracker 的任务反馈, TaskTracker 的业务日志传输的场景下.
FailStore 目前提供几种实现: leveldb,rocksdb,berkeleydb,mapdb,ltsdb, 用于可以自由选择使用哪种, 用户也可以采用 SPI 扩展使用自己的实现.
流程图
下图是一个标准的实时任务执行流程.
LTS progress
image.PNG
目前后台带有由 ztajy 提供的一个简易的认证功能. 用户名密码在 auth.cfg 中, 用户自行修改.
特性
1,Spring 支持
LTS 可以完全不用 Spring 框架, 但是考虑到很用用户项目中都是用了 Spring 框架, 所以 LTS 也提供了对 Spring 的支持, 包括 xml 和注解, 引入 lts-spring.jar 即可.
2, 业务日志记录器
在 TaskTracker 端提供了业务日志记录器, 供应用程序使用, 通过这个业务日志器, 可以将业务日志提交到 JobTracker, 这些业务日志可以通过任务 ID 串联起来, 可以在 LTS-Admin 中实时查看任务的执行进度.
3,SPI 扩展支持
SPI 扩展可以达到零侵入, 只需要实现相应的接口, 并实现即可被 LTS 使用, 目前开放出来的扩展接口有
对任务队列的扩展, 用户可以不选择使用 MySQL 或者 mongo 作为队列存储, 也可以自己实现.
对业务日志记录器的扩展, 目前主要支持 console,MySQL,mongo, 用户也可以通过扩展选择往其他地方输送日志.
4, 故障转移
当正在执行任务的 TaskTracker 宕机之后, JobTracker 会立马将分配在宕机的 TaskTracker 的所有任务再分配给其他正常的 TaskTracker 节点执行.
5, 节点监控
可以对 JobTracker,TaskTracker 节点进行资源监控, 任务监控等, 可以实时的在 LTS-Admin 管理后台查看, 进而进行合理的资源调配.
6, 多样化任务执行结果支持
LTS 框架提供四种执行结果支持, EXECUTE_SUCCESS,EXECUTE_FAILED,EXECUTE_LATER,EXECUTE_EXCEPTION, 并对每种结果采取相应的处理机制, 譬如重试.
EXECUTE_SUCCESS: 执行成功, 这种情况, 直接反馈客户端(如果任务被设置了要反馈给客户端).
EXECUTE_FAILED: 执行失败, 这种情况, 直接反馈给客户端, 不进行重试.
EXECUTE_LATER: 稍后执行(需要重试), 这种情况, 不反馈客户端, 重试策略采用 1min,2min,3min 的策略, 默认最大重试次数为 10 次, 用户可以通过参数设置修改这个重试次数.
EXECUTE_EXCEPTION: 执行异常, 这种情况也会重试(重试策略, 同上)
7,FailStore 容错
采用 FailStore 机制来进行节点容错, Fail And Store, 不会因为远程通信的不稳定性而影响当前应用的运行. 具体 FailStore 说明, 请参考概念说明中的 FailStore 说明.
项目编译打包
项目主要采用 maven 进行构建, 目前提供 shell 脚本的打包. 环境依赖: Java(jdk1.6+) Maven
用户使用一般分为两种:
1,Maven 构建
可以通过 maven 命令将 lts 的 jar 包上传到本地仓库中. 在父 pom.xml 中添加相应的 repository, 并用 deploy 命令上传即可. 具体引用方式可以参考 lts 中的例子即可.
2, 直接 Jar 引用
需要将 lts 的各个模块打包成单独的 jar 包, 并且将所有 lts 依赖包引入. 具体引用哪些 jar 包可以参考 lts 中的例子即可.
JobTracker 和 LTS-Admin 部署
提供(cmd)Windows 和(shell)Linux 两种版本脚本来进行编译和部署:
运行根目录下的 sh build.sh 或 build.cmd 脚本, 会在 dist 目录下生成 lts-{version}-bin 文件夹
下面是其目录结构, 其中 bin 目录主要是 JobTracker 和 LTS-Admin 的启动脚本. jobtracker 中是 JobTracker 的配置文件和需要使用到的 jar 包, lts-admin 是 LTS-Admin 相关的 war 包和配置文件. lts-{version}-bin 的文件结构
- -- lts-${
- version
- }-bin
- |-- bin
- | |-- jobtracker.cmd
- | |-- jobtracker.sh
- | |-- lts-admin.cmd
- | |-- lts-admin.sh
- | |-- lts-monitor.cmd
- | |-- lts-monitor.sh
- | |-- tasktracker.sh
- |-- conf
- | |-- log4j.properties
- | |-- lts-admin.cfg
- | |-- lts-monitor.cfg
- | |-- readme.txt
- | |-- tasktracker.cfg
- | |-- zoo
- | |-- jobtracker.cfg
- | |-- log4j.properties
- | |-- lts-monitor.cfg
- |-- lib
- | |-- *.jar
- |-- war
- |-- jetty
- | |-- lib
- | |-- *.jar
- |-- lts-admin.war
JobTracker 启动. 如果你想启动一个节点, 直接修改下 conf/zoo 下的配置文件, 然后运行 sh jobtracker.sh zoo start 即可, 如果你想启动两个 JobTracker 节点, 那么你需要拷贝一份 zoo, 譬如命名为 zoo2, 修改下 zoo2 下的配置文件, 然后运行 sh jobtracker.sh zoo2 start 即可. logs 文件夹下生成 jobtracker-zoo.out 日志.
LTS-Admin 启动. 修改 conf/lts-monitor.cfg 和 conf/lts-admin.cfg 下的配置, 然后运行 bin 下的 sh lts-admin.sh 或 lts-admin.cmd 脚本即可. logs 文件夹下会生成 lts-admin.out 日志, 启动成功在日志中会打印出访问地址, 用户可以通过这个访问地址访问了.
JobClient(部署)使用
需要引入 lts 的 jar 包有 lts-jobclient-{version}.jar,lts-core-{version}.jar 及其它第三方依赖 jar.
API 方式启动
- JobClient jobClient = new RetryJobClient();
- jobClient.setNodeGroup("test_jobClient");
- jobClient.setClusterName("test_cluster");
- jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
- jobClient.start();
- // 提交任务
- Job job = new Job();
- job.setTaskId("3213213123");
- job.setParam("shopId", "11111");
- job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
- // job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression 表达式
- // job.setTriggerTime(new Date()); // 支持指定时间执行
- Response response = jobClient.submitJob(job);
Spring xml 方式启动
- <bean id="jobClient" class="com.github.ltsopensource.spring.JobClientFactoryBean">
- <property name="clusterName" value="test_cluster"/>
- <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
- <property name="nodeGroup" value="test_jobClient"/>
- <property name="masterChangeListeners">
- <list>
- <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/>
- </list>
- </property>
- <property name="jobFinishedHandler">
- <bean class="com.github.ltsopensource.example.support.JobFinishedHandlerImpl"/>
- </property>
- <property name="configs">
- <props>
- <!-- 参数 -->
- <prop key="job.fail.store">leveldb</prop>
- </props>
- </property>
- </bean>
Spring 全注解方式
- @Configuration
- public class LTSSpringConfig {
- @Bean(name = "jobClient")
- public JobClient getJobClient() throws Exception {
- JobClientFactoryBean factoryBean = new JobClientFactoryBean();
- factoryBean.setClusterName("test_cluster");
- factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
- factoryBean.setNodeGroup("test_jobClient");
- factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
- new MasterChangeListenerImpl()
- });
- Properties configs = new Properties();
- configs.setProperty("job.fail.store", "leveldb");
- factoryBean.setConfigs(configs);
- factoryBean.afterPropertiesSet();
- return factoryBean.getObject();
- }
- }
- TaskTracker(部署使用)
需要引入 lts 的 jar 包有 lts-tasktracker-{version}.jar,lts-core-{version}.jar 及其它第三方依赖 jar. ### 定义自己的任务执行类
- public class MyJobRunner implements JobRunner {
- @Override
- public Result run(JobContext jobContext) throws Throwable {
- try {
- // TODO 业务逻辑
- // 会发送到 LTS (JobTracker 上)
- jobContext.getBizLogger().info("测试, 业务日志啊啊啊啊啊");
- } catch (Exception e) {
- return new Result(Action.EXECUTE_FAILED, e.getMessage());
- }
- return new Result(Action.EXECUTE_SUCCESS, "执行成功了, 哈哈");
- }
- }
API 方式启动
- TaskTracker taskTracker = new TaskTracker();
- taskTracker.setJobRunnerClass(MyJobRunner.class);
- taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
- taskTracker.setNodeGroup("test_trade_TaskTracker");
- taskTracker.setClusterName("test_cluster");
- taskTracker.setWorkThreads(20);
- taskTracker.start();
Spring xml 方式启动
- <bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
- <property name="jobRunnerClass" value="com.github.ltsopensource.example.support.MyJobRunner"/>
- <property name="bizLoggerLevel" value="INFO"/>
- <property name="clusterName" value="test_cluster"/>
- <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
- <property name="nodeGroup" value="test_trade_TaskTracker"/>
- <property name="workThreads" value="20"/>
- <property name="masterChangeListeners">
- <list>
- <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/>
- </list>
- </property>
- <property name="configs">
- <props>
- <prop key="job.fail.store">leveldb</prop>
- </props>
- </property>
- </bean>
Spring 注解方式启动
- @Configuration
- public class LTSSpringConfig implements ApplicationContextAware {
- private ApplicationContext applicationContext;
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = applicationContext;
- }
- @Bean(name = "taskTracker")
- public TaskTracker getTaskTracker() throws Exception {
- TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean();
- factoryBean.setApplicationContext(applicationContext);
- factoryBean.setClusterName("test_cluster");
- factoryBean.setJobRunnerClass(MyJobRunner.class);
- factoryBean.setNodeGroup("test_trade_TaskTracker");
- factoryBean.setBizLoggerLevel("INFO");
- factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
- factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
- new MasterChangeListenerImpl()
- });
- factoryBean.setWorkThreads(20);
- Properties configs = new Properties();
- configs.setProperty("job.fail.store", "leveldb");
- factoryBean.setConfigs(configs);
- factoryBean.afterPropertiesSet();
- // factoryBean.start();
- return factoryBean.getObject();
- }
- }
参数说明
参数说明
使用建议
一般在一个 JVM 中只需要一个 JobClient 实例即可, 不要为每种任务都新建一个 JobClient 实例, 这样会大大的浪费资源, 因为一个 JobClient 可以提交多种任务. 相同的一个 JVM 一般也尽量保持只有一个 TaskTracker 实例即可, 多了就可能造成资源浪费. 当遇到一个 TaskTracker 要运行多种任务的时候, 请参考下面的 "一个 TaskTracker 执行多种任务".
一个 TaskTracker 执行多种任务
有的时候, 业务场景需要执行多种任务, 有些人会问, 是不是要每种任务类型都要一个 TaskTracker 去执行. 我的答案是否定的, 如果在一个 JVM 中, 最好使用一个 TaskTracker 去运行多种任务, 因为一个 JVM 中使用多个 TaskTracker 实例比较浪费资源(当然当你某种任务量比较多的时候, 可以将这个任务单独使用一个 TaskTracker 节点来执行). 那么怎么才能实现一个 TaskTracker 执行多种任务呢. 下面是我给出来的参考例子.
- /**
- * 总入口, 在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class)
- * JobClient 提交 任务时指定 Job 类型 job.setParam("type", "aType")
- */
- public class JobRunnerDispatcher implements JobRunner {
- private static final ConcurrentHashMap<String/*type*/, JobRunner>
- JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();
- static {
- JOB_RUNNER_MAP.put("aType", new JobRunnerA()); // 也可以从 Spring 中拿
- JOB_RUNNER_MAP.put("bType", new JobRunnerB());
- }
- @Override
- public Result run(JobContext jobContext) throws Throwable {
- Job job = jobContext.getJob();
- String type = job.getParam("type");
- return JOB_RUNNER_MAP.get(type).run(job);
- }
- }
- class JobRunnerA implements JobRunner {
- @Override
- public Result run(JobContext jobContext) throws Throwable {
- // TODO A 类型 Job 的逻辑
- return null;
- }
- }
- class JobRunnerB implements JobRunner {
- @Override
- public Result run(JobContext jobContext) throws Throwable {
- // TODO B 类型 Job 的逻辑
- return null;
- }
- }
TaskTracker 的 JobRunner 测试
一般在编写 TaskTracker 的时候, 只需要测试 JobRunner 的实现逻辑是否正确, 又不想启动 LTS 进行远程测试. 为了方便测试, LTS 提供了 JobRunner 的快捷测试方法. 自己的测试类集成 com.GitHub.ltsopensource.tasktracker.runner.JobRunnerTester 即可, 并实现 initContext 和 newJobRunner 方法即可. 如 lts-examples 中的例子:
- public class TestJobRunnerTester extends JobRunnerTester {
- public static void main(String[] args) throws Throwable {
- // Mock Job 数据
- Job job = new Job();
- job.setTaskId("2313213");
- JobContext jobContext = new JobContext();
- jobContext.setJob(job);
- JobExtInfo jobExtInfo = new JobExtInfo();
- jobExtInfo.setRetry(false);
- jobContext.setJobExtInfo(jobExtInfo);
- // 运行测试
- TestJobRunnerTester tester = new TestJobRunnerTester();
- Result result = tester.run(jobContext);
- System.out.println(JSON.toJSONString(result));
- }
- @Override
- protected void initContext() {
- // TODO 初始化 Spring 容器
- }
- @Override
- protected JobRunner newJobRunner() {
- return new TestJobRunner();
- }
- }
Spring Quartz Cron 任务无缝接入
对于 Quartz 的 Cron 任务只需要在 Spring 配置中增加一下代码就可以接入 LTS 平台
- <bean class="com.github.ltsopensource.spring.quartz.QuartzLTSProxyBean">
- <property name="clusterName" value="test_cluster"/>
- <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
- <property name="nodeGroup" value="quartz_test_group"/>
- </bean>
Spring Boot 支持
- @SpringBootApplication
- @EnableJobTracker // 启动 JobTracker
- @EnableJobClient // 启动 JobClient
- @EnableTaskTracker // 启动 TaskTracker
- @EnableMonitor // 启动 Monitor
- public class Application {
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
- }
剩下的就只是在 application.properties 中添加相应的配置就行了, 具体见 lts-example 中的 com.GitHub.ltsopensource.examples.springboot 包下的例子
多网卡选择问题
当机器有内网两个网卡的时候, 有时候, 用户想让 LTS 的流量走外网网卡, 那么需要在 host 中, 把主机名称的映射地址改为外网网卡地址即可, 内网同理.
关于节点标识问题
如果在节点启动的时候设置节点标识, LTS 会默认设置一个 UUID 为节点标识, 可读性会比较差, 但是能保证每个节点的唯一性, 如果用户能自己保证节点标识的唯一性, 可以通过 setIdentity 来设置, 譬如如果每个节点都是部署在一台机器 (一个虚拟机) 上, 那么可以将 identity 设置为主机名称
SPI 扩展说明
支持 JobLogger,JobQueue 等等的 SPI 扩展
和其它解决方案比较
LTS-Admin 使用 jetty 启动(默认), 不定期挂掉解决方案 见 issue#389
来源: http://www.jianshu.com/p/e909d21fba73