本文基于 Elastic-Job V2.1.5 版本分享
1. 概述
. ElasticJobListener
. AbstractDistributeOnceElasticJobListener
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复.甚至不知道如何读源码也可以请教噢.
新的源码解析文章实时收到通知.每周更新一篇左右.
认真的源码交流微信群.
1. 概述
本文主要分享 Elastic-Job-Lite 作业监听器.
涉及到主要类的类图如下 ( 打开大图 ):
绿色监听器接口 ElasticJobListener,每台作业节点均执行.
粉色监听器接口 AbstractDistributeOnceElasticJobListener,分布式场景中仅单一节点执行.
蓝色类在
com.dangdang.ddframe.job.lite.internal.guarantee
里,保证分布式任务全部开始和结束状态. AbstractDistributeOnceElasticJobListener 通过 guarantee 功能,实现分布式场景中仅单一节点执行.
你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 Elastic-Job 点赞! 传送门
2. ElasticJobListener
ElasticJobListener,作业监听器接口,每台作业节点均执行.
若作业处理作业服务器的文件,处理完成后删除文件,可考虑使用每个节点均执行清理任务.此类型任务实现简单,且无需考虑全局分布式任务是否完成,请尽量使用此类型监听器.
接口代码如下:
* 作业执行前的执行的方法.
public interface ElasticJobListener {
/**
*
* @param shardingContexts 分片上下文
* 作业执行后的执行的方法.
* /
void beforeJobExecuted(final ShardingContexts shardingContexts);
/ * *
*
* @param shardingContexts 分片上下文
调用执行如下:
* /
void afterJobExecuted(final ShardingContexts shardingContexts);
}/
JobFacade 对作业监听器简单封装进行调用.
// AbstractElasticJobExecutor.java
public final void execute() {
// ...省略无关代码
// 执行 作业执行前的方法
try {
jobFacade.beforeJobExecuted(shardingContexts);
} catch(final Throwable cause) {
jobExceptionHandler.handleException(jobName, cause);
}
// ...省略无关代码(执行 普通触发的作业)
// ...省略无关代码(执行 被跳过触发的作业)
// ...省略无关代码(执行 作业失效转移)
// ...执行 作业执行后的方法
try {
jobFacade.afterJobExecuted(shardingContexts);
} catch(final Throwable cause) {
jobExceptionHandler.handleException(jobName, cause);
}
}
下文提到的 AbstractDistributeOnceElasticJobListener,也是这么调用.
// LiteJobFacade.java
@Override
public void beforeJobExecuted(final ShardingContexts shardingContexts) {
for (ElasticJobListener each : elasticJobListeners) {
each.beforeJobExecuted(shardingContexts);
}
}
@Override
public void afterJobExecuted(final ShardingContexts shardingContexts) {
for (ElasticJobListener each : elasticJobListeners) {
each.afterJobExecuted(shardingContexts);
}
}
3. AbstractDistributeOnceElasticJobListener
AbstractDistributeOnceElasticJobListener,在分布式作业中只执行一次的监听器.
若作业处理数据库数据,处理完成后只需一个节点完成数据清理任务即可.此类型任务处理复杂,需同步分布式环境下作业的状态同步,提供了超时设置来避免作业不同步导致的死锁,请谨慎使用.
创建 AbstractDistributeOnceElasticJobListener 代码如下:
* 开始超时时间
public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener {
/**
* 开始等待对象
* /
private final long startedTimeoutMilliseconds;
/ * *
* 完成超时时间
* /
private final Object startedWait = new Object();
/ * *
* 完成等待对象
* /
private final long completedTimeoutMilliseconds;
/ * *
* 保证分布式任务全部开始和结束状态的服务
* /
private final Object completedWait = new Object();
/ * *
超时参数
* /
@Setter private GuaranteeService guaranteeService;
private TimeService timeService = new TimeService();
public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) {
if (startedTimeoutMilliseconds <= 0L) {
this.startedTimeoutMilliseconds = Long.MAX_VALUE;
} else {
this.startedTimeoutMilliseconds = startedTimeoutMilliseconds;
}
if (completedTimeoutMilliseconds <= 0L) {
this.completedTimeoutMilliseconds = Long.MAX_VALUE;
} else {
this.completedTimeoutMilliseconds = completedTimeoutMilliseconds;
}
}
}/
务必传递,避免作业不同步导致的死锁.
startedTimeoutMilliseconds
,
completedTimeoutMilliseconds
来源: https://juejin.im/entry/5a59307751882573450178ac