xxl-job 介绍
xxl-job 是一个分布式任务调度系统, 基于 quartz 实现调度器.
,quartz 是基于数据库 for update 实现锁, 来保证同一个任务同一时间只会执行一次.
, 最新版本的 xxl-job 已经摒弃了 quartz.
xxl-job 核心模块
1, 调度中心, 也就是任务的管理系统
2, 执行器, 任务真正的执行服务, 一般是分布式的服务.
任务执行过程
1, 调度中心
1.1 点击执行的时候, 入口 JobInfoController.triggerJob 方法:
进入 JobTriggerPoolHelper.trigger 方法, 调用了 JobTriggerPoolHelper.addTrigger 方法, 那么看一下 addTrigger 方法:
- /**
- * add trigger
- */
- public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {
- // choose thread pool
- // 这里区分了快慢线程池, 1 分钟内超过 500 毫秒的请求大于 10 次, 放入慢线程池处理
- // 快线程池默认 8 个核心线程, 最大线程 200, 任务队列 1000
- // 慢线程池默认 0 个核心线程, 最大线程 100, 任务队列 2000
- ThreadPoolExecutor triggerPool_ = fastTriggerPool;
- AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
- if (jobTimeoutCount!=null && jobTimeoutCount.get()> 10) { // job-timeout 10 times in 1 min
- triggerPool_ = slowTriggerPool;
- }
- // trigger
- triggerPool_.execute(new Runnable() {
- @Override
- public void run() {
- long start = System.currentTimeMillis();
- try {
- // 触发执行任务
- XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- } finally {
- // check timeout-count-map
- long minTim_now = System.currentTimeMillis()/60000;
- if (minTim != minTim_now) {
- minTim = minTim_now;
- jobTimeoutCountMap.clear();
- }
- // incr timeout-count-map
- long cost = System.currentTimeMillis()-start;
- if (cost> 500) { // 耗时超过 500ms 就计算为慢请求, 加入慢线程池.
- AtomicInteger timeoutCount = jobTimeoutCountMap.put(jobId, new AtomicInteger(1));
- if (timeoutCount != null) {
- timeoutCount.incrementAndGet();
- }
- }
- }
- }
- });
- }
进入执行任务: XxlJobTrigger.trigger
- public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
- // load data
- // 通过 JobId 从数据库中查询该任务的具体信息
- XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
- if (jobInfo == null) {
- logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
- return;
- }
- if (executorParam != null) {
- jobInfo.setExecutorParam(executorParam);
- }
- int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
- // 获取该类型的执行器信息
- XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
- // sharding param
- // 分片信息
- int[] shardingParam = null;
- if (executorShardingParam!=null){
- String[] shardingArr = executorShardingParam.split("/");
- if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
- shardingParam = new int[2];
- shardingParam[0] = Integer.valueOf(shardingArr[0]);
- shardingParam[1] = Integer.valueOf(shardingArr[1]);
- }
- }
- // 广播模式, 循环执行器配置的服务地址列表
- if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
- && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
- && shardingParam==null) {
- for (int i = 0; i <group.getRegistryList().size(); i++) {
- processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
- }
- } else {
- if (shardingParam == null) {
- shardingParam = new int[]{0, 1};
- }
- // 非广播模式进入
- processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
- }
- }
进入 processTrigger 方法, 组装任务参数, 选择路由和阻塞策略
- private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
- // 阻塞策略
- ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
- // 路由策略, 官方一共 10 中路由策略
- ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
- String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
- // 1,save log-id 日志信息
- XxlJobLog jobLog = new XxlJobLog();
- jobLog.setJobGroup(jobInfo.getJobGroup());
- jobLog.setJobId(jobInfo.getId());
- jobLog.setTriggerTime(new Date());
- XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
- logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
- // 2,init trigger-param 初始化任务参数
- // 这里要增加 2 个参数;
- // 1, 固定 IP,
- // 2, 任务为空时, 默认轮训队列次数
- TriggerParam triggerParam = new TriggerParam();
- triggerParam.setJobId(jobInfo.getId());
- triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
- triggerParam.setExecutorParams(jobInfo.getExecutorParam());
- triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
- triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
- triggerParam.setLogId(jobLog.getId());
- triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
- triggerParam.setGlueType(jobInfo.getGlueType());
- triggerParam.setGlueSource(jobInfo.getGlueSource());
- triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
- triggerParam.setBroadcastIndex(index);
- triggerParam.setBroadcastTotal(total);
- triggerParam.setAssignAddress(jobInfo.getAssignAddress());
- triggerParam.setJobEmptyLoopNum(jobInfo.getJobEmptyLoopNum());
- // 3,init address 选择执行器服务地址
- String address = null;
- ReturnT<String> routeAddre***esult = null;
- if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
- if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
- if (index <group.getRegistryList().size()) {
- address = group.getRegistryList().get(index);
- } else {
- address = group.getRegistryList().get(0);
- }
- } else {
- // 根据路由策略, 选择合适的执行器服务地址来执行任务
- routeAddre***esult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
- if (routeAddre***esult.getCode() == ReturnT.SUCCESS_CODE) {
- address = routeAddre***esult.getContent();
- }
- }
- } else {
- routeAddre***esult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
- }
- // 4,trigger remote executor
- ReturnT<String> triggerResult = null;
- if (address != null) {
- // 远程调用, 这个是重点方法
- triggerResult = runExecutor(triggerParam, address);
- } else {
- triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
- }
- // 5,collection trigger info
- StringBuffer triggerMsgSb = new StringBuffer();
- triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
- .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
- if (shardingParam != null) {
- triggerMsgSb.append("("+shardingParam+")");
- }
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
- triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\">>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<<</span><br>")
- .append((routeAddre***esult!=null&&routeAddre***esult.getMsg()!=null)?routeAddre***esult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
- // 6,save log trigger-info
- jobLog.setExecutorAddress(address);
- jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
- jobLog.setExecutorParam(jobInfo.getExecutorParam());
- jobLog.setExecutorShardingParam(shardingParam);
- jobLog.setExecutorFailRetryCount(finalFailRetryCount);
- //jobLog.setTriggerTime();
- jobLog.setTriggerCode(triggerResult.getCode());
- jobLog.setTriggerMsg(triggerMsgSb.toString());
- XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
- logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
- }
由于某些场景, 某个任务必须保证只能执行一次, 或者宁愿不执行也不允许重复执行, 比方说发放优惠券, 可以少发或者不发, 但是不能多发, 这种情况下, 让任务固定到一个执行器服务 IP 上执行, 所以在原来的基础上增加了一个策略, 固定 IP 策略
组装好参数, 选择了执行任务地址, 进入 runExecutor 方法
创建好了 RPC 的客户端对象, 在创建对象过程中使用了 NettyHttp 协议, HESSIAN 序列化, 就可以发起 RPC 请求了
- public static ExecutorBiz getExecutorBiz(String address) throws Exception {
- // valid
- if (address==null || address.trim().length()==0) {
- return null;
- }
- // load-cache 是否在缓存中
- address = address.trim();
- ExecutorBiz executorBiz = executorBizRepository.get(address);
- if (executorBiz != null) {
- return executorBiz;
- }
- // set-cache
- // 创建 ExecutorBiz 的代理对象, 重点在这个里面.
- executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
- NetEnum.NETTY_HTTP, //nettyHttp
- Serializer.SerializeEnum.HESSIAN.getSerializer(),// 序列化
- CallType.SYNC,// 同步
- LoadBalance.ROUND,
- ExecutorBiz.class,
- null,
- 5000,
- address,
- XxlJobAdminConfig.getAdminConfig().getAccessToken(),
- null,
- null).getObject();
- executorBizRepository.put(address, executorBiz); // 对象放入缓存
- return executorBiz;
- }
然后发起 RPC 请求: executorBiz.run
- public ReturnT<String> run(TriggerParam triggerParam) {
- // load old:jobHandler + jobThread
- // 通过参数中的 JobID, 从本地线程库里面获取线程 ( 第一次进来是没有线程的, jobThread 为空 ,
- // 本地线程库, 本质上就是一个 ConcurrentHashMap<Integer, JobThread>
- JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
- IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
- String removeOldReason = null;
- // valid:jobHandler + jobThread
- // 运行模式, 这里看一下 java 模式就可以了
- GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
- if (GlueTypeEnum.BEAN == glueTypeEnum) {
- // new jobhandler
- // 通过参数中的 handlerName 从本地内存中获取 handler 实例
- // (在执行器启动的时候, 是把所有带有 @JobHandler 的实例通过 name 放入到一个 map 中的 )
- IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
- // valid old jobThread
- // 如果修改了任务的 handler, name 此处会默认把以前老的 handler 清空, 后面会以最新的 newJobHandler 为准
- if (jobThread!=null && jobHandler != newJobHandler) {
- // change handler, need kill old thread
- removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
- jobThread = null;
- jobHandler = null;
- }
- // valid handler
- if (jobHandler == null) {
- jobHandler = newJobHandler;
- if (jobHandler == null) {
- return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
- }
- }
- } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
- // valid old jobThread
- if (jobThread != null &&
- !(jobThread.getHandler() instanceof GlueJobHandler
- && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
- // change handler or gluesource updated, need kill old thread
- removeOldReason = "change job source or glue type, and terminate the old job thread.";
- jobThread = null;
- jobHandler = null;
- }
- // valid handler
- if (jobHandler == null) {
- try {
- IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
- jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
- }
- }
- } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
- // valid old jobThread
- if (jobThread != null &&
- !(jobThread.getHandler() instanceof ScriptJobHandler
- && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
- // change script or gluesource updated, need kill old thread
- removeOldReason = "change job source or glue type, and terminate the old job thread.";
- jobThread = null;
- jobHandler = null;
- }
- // valid handler
- if (jobHandler == null) {
- jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
- }
- } else {
- return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
- }
- // executor block strategy
- if (jobThread != null) {
- ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
- if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
- // discard when running
- if (jobThread.isRunningOrHasQueue()) {
- return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
- }
- } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
- // kill running jobThread
- if (jobThread.isRunningOrHasQueue()) {
- removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
- jobThread = null;
- }
- } else {
- // just queue trigger
- }
- }
- // replace thread (new or exists invalid)
- // 如果 jobThread 为空, 那么这个时候, 就要注册一个线程到本地线程库里面去.
- // 然后启动这个线程, 线程会轮训任务队列开始执行, 可以查看 JobThread.run 方法
- if (jobThread == null) {
- jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason, triggerParam.getJobEmptyLoopNum());
- }
- // push data to queue
- // 任务线程已经存在了, 将任务参数放入任务队列, 每个任务线程有一个任务队列, 任务线程去轮询这个任务
- ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
- return pushResult;
- }
至此调度中心, 基本处理完成. 然后看执行器的操作流程.
执行器的流程
1, 执行器部署启动会执行 XxlJobSpringExecutor.start 方法:
然后执行 XxlJobExecutor.start 方法:
- public void start() throws Exception {
- // init logpath 初始化本地日志路径
- XxlJobFileAppender.initLogPath(logPath);
- // init invoker, admin-client
- // 初始化调度中心的地址列表, 创建好 adminBiz 实例, 调度中心客户端
- initAdminBizList(adminAddresses, accessToken);
- // init JobLogFileCleanThread
- JobLogFileCleanThread.getInstance().start(logRetentionDays);
- // init TriggerCallbackThread
- TriggerCallbackThread.getInstance().start();
- // init executor-server
- port = port>0?port: NetUtil.findAvailablePort(9999);
- ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
- // 启动执行器服务, 默认开端口 9999
- initRpcProvider(ip, port, appName, accessToken);
- }
RPC 服务启动了, 就可以正常提供执行器服务了.
任务线程如何处理任务.
JobThread 就是一个线程
重点看一下线程的 run 方法, 会循环获取队列里的任务, 每次获取超时时间是 3 秒, 默认 30 次, 如果没有任务就停止线程, 这里的 30 次已经进行了定制化修改.
- public void run() {
- // init
- try {
- // 初始化任务对象
- handler.init();
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- }
- // execute
- while(!toStop){
- running = false;
- idleTimes++;// 累加轮询次数
- TriggerParam triggerParam = null;
- ReturnT<String> executeResult = null;
- try {
- // 获取队列里的任务, 设置 3 秒钟超时
- triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
- if (triggerParam!=null) {
- running = true;
- idleTimes = 0;
- triggerLogIdSet.remove(triggerParam.getLogId());
- // log filename, like "logPath/yyyy-MM-dd/9999.log"
- String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
- XxlJobFileAppender.contextHolder.set(logFileName);
- ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
- // execute
- XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
- // 带超时的任务执行
- if (triggerParam.getExecutorTimeout()> 0) {
- // limit timeout
- Thread futureThread = null;
- try {
- final TriggerParam triggerParamTmp = triggerParam;
- FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
- @Override
- public ReturnT<String> call() throws Exception {
- // 执行任务
- return handler.execute(triggerParamTmp.getExecutorParams());
- }
- });
- // 启动线程执行任务
- futureThread = new Thread(futureTask);
- futureThread.start();
- // 获取执行任务结果
- executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
- XxlJobLogger.log(e);
- executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout");
- } finally {
- futureThread.interrupt();
- }
- } else {
- // just execute
- // 仅仅执行任务
- executeResult = handler.execute(triggerParam.getExecutorParams());
- }
- if (executeResult == null) {
- executeResult = IJobHandler.FAIL;
- } else {
- executeResult.setMsg(
- (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
- ?executeResult.getMsg().substring(0, 50000).concat("...")
- :executeResult.getMsg());
- executeResult.setContent(null); // limit obj size
- }
- XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
- } else {
- // 超过一定次数, 清空线程, 并设置 JobThread 的 stop 停止标识位, 终止轮询. 也就是 3*jobEmptyLoopNum 秒空轮询
- XxlJobLogger.log("<br>----------- xxl-job loop num diy set Param:" + jobEmptyLoopNum);
- if (idleTimes> jobEmptyLoopNum) {
- XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
- }
- }
- } catch (Throwable e) {
- if (toStop) {
- XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
- }
- StringWriter stringWriter = new StringWriter();
- e.printStackTrace(new PrintWriter(stringWriter));
- String errorMsg = stringWriter.toString();
- executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
- XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
- } finally {
- if(triggerParam != null) {
- // callback handler info
- if (!toStop) {
- // commonm
- TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
- } else {
- // is killed
- ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + "[job running,killed]");
- TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
- }
- }
- }
- }
- // callback trigger request in queue
- while(triggerQueue !=null && triggerQueue.size()>0){
- TriggerParam triggerParam = triggerQueue.poll();
- if (triggerParam!=null) {
- // is killed
- ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + "[job not executed, in the job queue, killed.]");
- TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
- }
- }
- // destroy
- try {
- handler.destroy();
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- }
- logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
- }
来源: http://www.bubuko.com/infodetail-3107482.html