之前写过 quartz 或者引用过 quartz 的一些文章, 有很多人给我发消息问 quartz 的相关问题,
quartz 报错: java.lang.classNotFoundException
quartz 源码分析之深刻理解 job,sheduler,calendar,trigger 及 listener 之间的关系
Quartz 框架多个 trigger 任务执行出现漏执行的问题分析 -- 转
quartz 集群调度机制调研及源码分析 --- 转载
分布式定时任务调度系统技术选型 -- 转
趁着年底比较清闲, 把 quartz 的问题整理了一下, 顺带翻了翻源码, 做了一些总结, 希望能帮助到一些人或者减少人们探索的时间.
注意, 使用版本为 quartz2.2.3 spring boot2.1.3
1.quartz 的核心组件
1.1 Job 组件
1.1.1Job
Job 负责任务执行的逻辑, 所有逻辑在 execute() 方法中, 执行所需要的数据存放在 JobExecutionContext 中
Job 实例:
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class ColorJob implements Job {
- private static Logger _log = LoggerFactory.getLogger(ColorJob.class);
- // parameter names specific to this job
- public static final String FAVORITE_COLOR = "favorite color";
- public static final String EXECUTION_COUNT = "count";
- // Since Quartz will re-instantiate a class every time it
- // gets executed, members non-static member variables can
- // not be used to maintain state!
- private int _counter = 1;
- /**
- * <p>
- * Empty constructor for job initialization
- * </p>
- * <p>
- * Quartz requires a public empty constructor so that the
- * scheduler can instantiate the class whenever it needs.
- * </p>
- */
- public ColorJob() {
- }
- /**
- * <p>
- * Called by the <code>{@link org.quartz.Scheduler}</code> when a
- * <code>{@link org.quartz.Trigger}</code> fires that is associated with
- * the <code>Job</code>.
- * </p>
- *
- * @throws JobExecutionException
- * if there is an exception while executing the job.
- */
- public void execute(JobExecutionContext context)
- throws JobExecutionException {
- // This job simply prints out its job name and the
- // date and time that it is running
- JobKey jobKey = context.getJobDetail().getKey();
- // Grab and print passed parameters
- JobDataMap data = context.getJobDetail().getJobDataMap();
- String favoriteColor = data.getString(FAVORITE_COLOR);
- int count = data.getInt(EXECUTION_COUNT);
- _log.info("ColorJob:" + jobKey + "executing at" + new Date() + "\n" +
- "favorite color is" + favoriteColor + "\n" +
- "execution count (from job map) is" + count + "\n" +
- "execution count (from job member variable) is" + _counter);
- // increment the count and store it back into the
- // job map so that job state can be properly maintained
- count++;
- data.put(EXECUTION_COUNT, count);
- // Increment the local member variable
- // This serves no real purpose since job state can not
- // be maintained via member variables!
- _counter++;
- }
- }
1.1.2 JobDetail 存储 Job 的信息
主要负责
1. 指定执行的 Job 类, 唯一标识 (job 名称和组别 名称)
2. 存储 JobDataMap 信息
- // job1 will only run 5 times (at start time, plus 4 repeats), every 10 seconds
- JobDetail job1 = newJob(ColorJob.class).withIdentity("job1", "group1").build();
- // pass initialization parameters into the job
- job1.getJobDataMap().put(ColorJob.FAVORITE_COLOR, "Green");
- job1.getJobDataMap().put(ColorJob.EXECUTION_COUNT, 1);
数据库存储如下:
1.1.3 Quartz JobBuilder 提供了一个链式 API 创建 JobDetail
- @Bean
- public JobDetail jobDetail() {
- return JobBuilder.newJob().ofType(SampleJob.class)
- .storeDurably()
- .withIdentity("Qrtz_Job_Detail")
- .withDescription("Invoke Sample Job service...")
- .build();
- }
- 1.1.4 Spring JobDetailFactoryBean
spring 提供的一个创建 JobDetail 的方式工厂 bean
- @Bean
- public JobDetailFactoryBean jobDetail() {
- JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
- jobDetailFactory.setJobClass(SampleJob.class);
- jobDetailFactory.setDescription("Invoke Sample Job service...");
- jobDetailFactory.setDurability(true);
- return jobDetailFactory;
- }
1.2 Trigger 组件
trigger 的状态不同
trigger 的状态
- // STATES
- String STATE_WAITING = "WAITING";
- String STATE_ACQUIRED = "ACQUIRED";
- String STATE_EXECUTING = "EXECUTING";
- String STATE_COMPLETE = "COMPLETE";
- String STATE_BLOCKED = "BLOCKED";
- String STATE_ERROR = "ERROR";
- String STATE_PAUSED = "PAUSED";
- String STATE_PAUSED_BLOCKED = "PAUSED_BLOCKED";
- String STATE_DELETED = "DELETED";
状态的表结构
trigger 的类型
- // TRIGGER TYPES
- /** Simple Trigger type. */
- String TTYPE_SIMPLE = "SIMPLE";
- /** Cron Trigger type. */
- String TTYPE_CRON = "CRON";
- /** Calendar Interval Trigger type. */
- String TTYPE_CAL_INT = "CAL_INT";
- /** Daily Time Interval Trigger type. */
- String TTYPE_DAILY_TIME_INT = "DAILY_I";
- /** A general blob Trigger type. */
- String TTYPE_BLOB = "BLOB";
对应表结构
1.2.1 trigger 实例
- SimpleTrigger trigger1 = newTrigger().withIdentity("trigger1", "group1").startAt(startTime)
- .withSchedule(simpleSchedule().withIntervalInSeconds(10).withRepeatCount(4)).build();
Trigger 存储在 MySQL 中
1.2.2 Quartz TriggerBuilder
提供了一个链式创建 Trigger 的 API
- @Bean
- public Trigger trigger(JobDetail job) {
- return TriggerBuilder.newTrigger().forJob(job)
- .withIdentity("Qrtz_Trigger")
- .withDescription("Sample trigger")
- .withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1))
- .build();
- }
- 1.2.3 Spring SimpleTriggerFactoryBean
spring 提供的一个创建 SimpleTrigger 的工厂类
- @Bean
- public SimpleTriggerFactoryBean trigger(JobDetail job) {
- SimpleTriggerFactoryBean trigger = new SimpleTriggerFactoryBean();
- trigger.setJobDetail(job);
- trigger.setRepeatInterval(3600000);
- trigger.setRepeatCount(SimpleTrigger.REPEAT_INDEFINITELY);
- return trigger;
- }
1.3 调度组件
1.3.1 quartz 提供的工厂类
- @Bean
- public Scheduler scheduler(Trigger trigger, JobDetail job) {
- StdSchedulerFactory factory = new StdSchedulerFactory();
- factory.initialize(new ClassPathResource("quartz.properties").getInputStream());
- Scheduler scheduler = factory.getScheduler();
- scheduler.setJobFactory(springBeanJobFactory());
- scheduler.scheduleJob(job, trigger);
- scheduler.start();
- return scheduler;
- }
1.3.2 spring 提供的工厂 bean
- @Bean
- public SchedulerFactoryBean scheduler(Trigger trigger, JobDetail job) {
- SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
- schedulerFactory.setConfigLocation(new ClassPathResource("quartz.properties"));
- schedulerFactory.setJobFactory(springBeanJobFactory());
- schedulerFactory.setJobDetails(job);
- schedulerFactory.setTriggers(trigger);
- return schedulerFactory;
- }
2. 工作原理
2.1 核心类 QuartzScheduler
Scheduler 实现类 StdScheduler 封装了核心工作类 QuartzScheduler
- /**
- * <p>
- * Construct a <code>StdScheduler</code> instance to proxy the given
- * <code>QuartzScheduler</code> instance, and with the given <code>SchedulingContext</code>.
- * </p>
- */
- public StdScheduler(QuartzScheduler sched) {
- this.sched = sched;
- }
2.2 JobDetail 的存取
- public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) throws SchedulerException {
- validateState();
- if (!storeNonDurableWhileAwaitingScheduling && !jobDetail.isDurable()) {
- throw new SchedulerException(
- "Jobs added with no trigger must be durable.");
- }
- resources.getJobStore().storeJob(jobDetail, replace);
- notifySchedulerThread(0L);
- notifySchedulerListenersJobAdded(jobDetail);
- }
2.2.1 存储 JobDetail 信息 (以 MySQL Jdbc 方式为例)
- /**
- * <p>
- * Insert or update a job.
- * </p>
- */
- protected void storeJob(Connection conn,
- JobDetail newJob, boolean replaceExisting)
- throws JobPersistenceException {
- boolean existingJob = jobExists(conn, newJob.getKey());
- try {
- if (existingJob) {
- if (!replaceExisting) {
- throw new ObjectAlreadyExistsException(newJob);
- }
- getDelegate().updateJobDetail(conn, newJob);
- } else {
- getDelegate().insertJobDetail(conn, newJob);
- }
- } catch (IOException e) {
- throw new JobPersistenceException("Couldn't store job: "
- + e.getMessage(), e);
- } catch (SQLException e) {
- throw new JobPersistenceException("Couldn't store job: "
- + e.getMessage(), e);
- }
- }
调用 StdJDBCDelegate 实现
- /**
- * <p>
- * Insert the job detail record.
- * </p>
- *
- * @param conn
- * the DB Connection
- * @param job
- * the job to insert
- * @return number of rows inserted
- * @throws IOException
- * if there were problems serializing the JobDataMap
- */
- public int insertJobDetail(Connection conn, JobDetail job)
- throws IOException, SQLException {
- ByteArrayOutputStream baos = serializeJobData(job.getJobDataMap());
- PreparedStatement ps = null;
- int insertResult = 0;
- try {
- ps = conn.prepareStatement(rtp(INSERT_JOB_DETAIL));
- ps.setString(1, job.getKey().getName());
- ps.setString(2, job.getKey().getGroup());
- ps.setString(3, job.getDescription());
- ps.setString(4, job.getJobClass().getName());
- setBoolean(ps, 5, job.isDurable());
- setBoolean(ps, 6, job.isConcurrentExectionDisallowed());
- setBoolean(ps, 7, job.isPersistJobDataAfterExecution());
- setBoolean(ps, 8, job.requestsRecovery());
- setBytes(ps, 9, baos);
- insertResult = ps.executeUpdate();
- } finally {
- closeStatement(ps);
- }
- return insertResult;
- }
注意: JobDataMap 序列化后以 Blob 形式存储到数据库中
StdJDBCConstants 中执行 sql 如下:
- String INSERT_JOB_DETAIL = "INSERT INTO"
- + TABLE_PREFIX_SUBST + TABLE_JOB_DETAILS + "("
- + COL_SCHEDULER_NAME + "," + COL_JOB_NAME
- + "," + COL_JOB_GROUP + "," + COL_DESCRIPTION + ","
- + COL_JOB_CLASS + "," + COL_IS_DURABLE + ","
- + COL_IS_NONCONCURRENT + "," + COL_IS_UPDATE_DATA + ","
- + COL_REQUESTS_RECOVERY + ","
- + COL_JOB_DATAMAP + ")" + "VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?)";
2.2.2 查询 JobDetail
强调一下, 因 JobDetail 中的 JobDataMap 是以 Blob 形式存放到数据库中的 (也可以通过 useProperties 属性修改成 string 存储, 默认是 false,Blob 形式存储), 所以查询时需要特殊处理: StdJDBCDelegate.java
- /**
- * <p>
- * Select the JobDetail object for a given job name / group name.
- * </p>
- *
- * @param conn
- * the DB Connection
- * @return the populated JobDetail object
- * @throws ClassNotFoundException
- * if a class found during deserialization cannot be found or if
- * the job class could not be found
- * @throws IOException
- * if deserialization causes an error
- */
- public JobDetail selectJobDetail(Connection conn, JobKey jobKey,
- ClassLoadHelper loadHelper)
- throws ClassNotFoundException, IOException, SQLException {
- PreparedStatement ps = null;
- ResultSet rs = null;
- try {
- ps = conn.prepareStatement(rtp(SELECT_JOB_DETAIL));
- ps.setString(1, jobKey.getName());
- ps.setString(2, jobKey.getGroup());
- rs = ps.executeQuery();
- JobDetailImpl job = null;
- if (rs.next()) {
- job = new JobDetailImpl();
- job.setName(rs.getString(COL_JOB_NAME));
- job.setGroup(rs.getString(COL_JOB_GROUP));
- job.setDescription(rs.getString(COL_DESCRIPTION));
- job.setJobClass( loadHelper.loadClass(rs.getString(COL_JOB_CLASS), Job.class));
- job.setDurability(getBoolean(rs, COL_IS_DURABLE));
- job.setRequestsRecovery(getBoolean(rs, COL_REQUESTS_RECOVERY));
- Map<?, ?> map = null;
- if (canUseProperties()) {
- map = getMapFromProperties(rs);
- } else {
- map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP);
- }
- if (null != map) {
- job.setJobDataMap(new JobDataMap(map));
- }
- }
- return job;
- } finally {
- closeResultSet(rs);
- closeStatement(ps);
- }
- }
2.3 查询 trigger
- /**
- * <p>
- * Retrieve the given <code>{@link org.quartz.Trigger}</code>.
- * </p>
- *
- * @return The desired <code>Trigger</code>, or null if there is no
- * match.
- */
- public OperableTrigger retrieveTrigger(final TriggerKey triggerKey) throws JobPersistenceException {
- return (OperableTrigger)executeWithoutLock( // no locks necessary for read...
- new TransactionCallback() {
- public Object execute(Connection conn) throws JobPersistenceException {
- return retrieveTrigger(conn, triggerKey);
- }
- });
- }
- protected OperableTrigger retrieveTrigger(Connection conn, TriggerKey key)
- throws JobPersistenceException {
- try {
- return getDelegate().selectTrigger(conn, key);
- } catch (Exception e) {
- throw new JobPersistenceException("Couldn't retrieve trigger: "
- + e.getMessage(), e);
- }
- }
StdJDBCDelegate.java
- /**
- * <p>
- * Select a trigger.
- * </p>
- *
- * @param conn
- * the DB Connection
- * @return the <code>{@link org.quartz.Trigger}</code> object
- * @throws JobPersistenceException
- */
- public OperableTrigger selectTrigger(Connection conn, TriggerKey triggerKey) throws SQLException, ClassNotFoundException,
- IOException, JobPersistenceException {
- PreparedStatement ps = null;
- ResultSet rs = null;
- try {
- OperableTrigger trigger = null;
- ps = conn.prepareStatement(rtp(SELECT_TRIGGER));
- ps.setString(1, triggerKey.getName());
- ps.setString(2, triggerKey.getGroup());
- rs = ps.executeQuery();
- if (rs.next()) {
- String jobName = rs.getString(COL_JOB_NAME);
- String jobGroup = rs.getString(COL_JOB_GROUP);
- String description = rs.getString(COL_DESCRIPTION);
- long nextFireTime = rs.getLong(COL_NEXT_FIRE_TIME);
- long prevFireTime = rs.getLong(COL_PREV_FIRE_TIME);
- String triggerType = rs.getString(COL_TRIGGER_TYPE);
- long startTime = rs.getLong(COL_START_TIME);
- long endTime = rs.getLong(COL_END_TIME);
- String calendarName = rs.getString(COL_CALENDAR_NAME);
- int misFireInstr = rs.getInt(COL_MISFIRE_INSTRUCTION);
- int priority = rs.getInt(COL_PRIORITY);
- Map<?, ?> map = null;
- if (canUseProperties()) {
- map = getMapFromProperties(rs);
- } else {
- map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP);
- }
- Date nft = null;
- if (nextFireTime> 0) {
- nft = new Date(nextFireTime);
- }
- Date pft = null;
- if (prevFireTime> 0) {
- pft = new Date(prevFireTime);
- }
- Date startTimeD = new Date(startTime);
- Date endTimeD = null;
- if (endTime> 0) {
- endTimeD = new Date(endTime);
- }
- if (triggerType.equals(TTYPE_BLOB)) {
- rs.close(); rs = null;
- ps.close(); ps = null;
- ps = conn.prepareStatement(rtp(SELECT_BLOB_TRIGGER));
- ps.setString(1, triggerKey.getName());
- ps.setString(2, triggerKey.getGroup());
- rs = ps.executeQuery();
- if (rs.next()) {
- trigger = (OperableTrigger) getObjectFromBlob(rs, COL_BLOB);
- }
- }
- else {
- TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(triggerType);
- if(tDel == null)
- throw new JobPersistenceException("No TriggerPersistenceDelegate for trigger discriminator type:" + triggerType);
- TriggerPropertyBundle triggerProps = null;
- try {
- triggerProps = tDel.loadExtendedTriggerProperties(conn, triggerKey);
- } catch (IllegalStateException isex) {
- if (isTriggerStillPresent(ps)) {
- throw isex;
- } else {
- // QTZ-386 Trigger has been deleted
- return null;
- }
- }
- TriggerBuilder<?> tb = newTrigger()
- .withDescription(description)
- .withPriority(priority)
- .startAt(startTimeD)
- .endAt(endTimeD)
- .withIdentity(triggerKey)
- .modifiedByCalendar(calendarName)
- .withSchedule(triggerProps.getScheduleBuilder())
- .forJob(jobKey(jobName, jobGroup));
- if (null != map) {
- tb.usingJobData(new JobDataMap(map));
- }
- trigger = (OperableTrigger) tb.build();
- trigger.setMisfireInstruction(misFireInstr);
- trigger.setNextFireTime(nft);
- trigger.setPreviousFireTime(pft);
- setTriggerStateProperties(trigger, triggerProps);
- }
- }
- return trigger;
- } finally {
- closeResultSet(rs);
- closeStatement(ps);
- }
- }
执行的 sql:
- String SELECT_TRIGGER = "SELECT * FROM"
- + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + "WHERE"
- + COL_SCHEDULER_NAME + "=" + SCHED_NAME_SUBST
- + "AND" + COL_TRIGGER_NAME + "= ? AND" + COL_TRIGGER_GROUP + "= ?";
和 JobDetail 一样, 也存在 Blob 的问题, 不再赘述.
2.4 调度执行线程 QuartzSchedulerThread
- /**
- * <p>
- * The main processing loop of the <code>QuartzSchedulerThread</code>.
- * </p>
- */
- @Override
- public void run() {
- boolean lastAcquireFailed = false;
- while (!halted.get()) {
- try {
- // check if we're supposed to pause...
- synchronized (sigLock) {
- while (paused && !halted.get()) {
- try {
- // wait until togglePause(false) is called...
- sigLock.wait(1000L);
- } catch (InterruptedException ignore) {
- }
- }
- if (halted.get()) {
- break;
- }
- }
- int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
- if(availThreadCount> 0) { // will always be true, due to semantics of blockForAvailableThreads...
- List<OperableTrigger> triggers = null;
- long now = System.currentTimeMillis();
- clearSignaledSchedulingChange();
- try {
- triggers = qsRsrcs.getJobStore().acquireNextTriggers(
- now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); //1.
- lastAcquireFailed = false;
- if (log.isDebugEnabled())
- log.debug("batch acquisition of" + (triggers == null ? 0 : triggers.size()) + "triggers");
- } catch (JobPersistenceException jpe) {
- if(!lastAcquireFailed) {
- qs.notifySchedulerListenersError(
- "An error occurred while scanning for the next triggers to fire.",
- jpe);
- }
- lastAcquireFailed = true;
- continue;
- } catch (RuntimeException e) {
- if(!lastAcquireFailed) {
- getLog().error("quartzSchedulerThreadLoop: RuntimeException"
- +e.getMessage(), e);
- }
- lastAcquireFailed = true;
- continue;
- }
- if (triggers != null && !triggers.isEmpty()) {
- now = System.currentTimeMillis();
- long triggerTime = triggers.get(0).getNextFireTime().getTime();
- long timeUntilTrigger = triggerTime - now;
- while(timeUntilTrigger> 2) {
- synchronized (sigLock) {
- if (halted.get()) {
- break;
- }
- if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
- try {
- // we could have blocked a long while
- // on 'synchronize', so we must recompute
- now = System.currentTimeMillis();
- timeUntilTrigger = triggerTime - now;
- if(timeUntilTrigger>= 1)
- sigLock.wait(timeUntilTrigger);
- } catch (InterruptedException ignore) {
- }
- }
- }
- if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
- break;
- }
- now = System.currentTimeMillis();
- timeUntilTrigger = triggerTime - now;
- }
- // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
- if(triggers.isEmpty())
- continue;
- // set triggers to 'executing'
- List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
- boolean goAhead = true;
- synchronized(sigLock) {
- goAhead = !halted.get();
- }
- if(goAhead) {
- try {
- List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); //2
- if(res != null)
- bndles = res;
- } catch (SchedulerException se) {
- qs.notifySchedulerListenersError(
- "An error occurred while firing triggers'"
- + triggers + "'", se);
- //QTZ-179 : a problem occurred interacting with the triggers from the db
- //we release them and loop again
- for (int i = 0; i <triggers.size(); i++) {
- qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
- }
- continue;
- }
- }
- for (int i = 0; i < bndles.size(); i++) {
- TriggerFiredResult result = bndles.get(i);
- TriggerFiredBundle bndle = result.getTriggerFiredBundle();
- Exception exception = result.getException();
- if (exception instanceof RuntimeException) {
- getLog().error("RuntimeException while firing trigger" + triggers.get(i), exception);
- qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
- continue;
- }
- // it's possible to get'null' if the triggers was paused,
- // blocked, or other similar occurrences that prevent it being
- // fired at this time... or if the scheduler was shutdown (halted)
- if (bndle == null) {
- qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
- continue;
- }
- JobRunShell shell = null;
- try {
- shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
- shell.initialize(qs);
- } catch (SchedulerException se) {
- qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
- continue;
- }
- if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
- // this case should never happen, as it is indicative of the
- // scheduler being shutdown or a bug in the thread pool or
- // a thread pool being used concurrently - which the docs
- // say not to do...
- getLog().error("ThreadPool.runInThread() return false!");
- qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
- }
- }
- continue; // while (!halted)
- }
- } else { // if(availThreadCount> 0)
- // should never happen, if threadPool.blockForAvailableThreads() follows contract
- continue; // while (!halted)
- }
- long now = System.currentTimeMillis();
- long waitTime = now + getRandomizedIdleWaitTime();
- long timeUntilContinue = waitTime - now;
- synchronized(sigLock) {
- try {
- if(!halted.get()) {
- // QTZ-336 A job might have been completed in the mean time and we might have
- // missed the scheduled changed signal by not waiting for the notify() yet
- // Check that before waiting for too long in case this very job needs to be
- // scheduled very soon
- if (!isScheduleChanged()) {
- sigLock.wait(timeUntilContinue);
- }
- }
- } catch (InterruptedException ignore) {
- }
- }
- } catch(RuntimeException re) {
- getLog().error("Runtime error occurred in main trigger firing loop.", re);
- }
- } // while (!halted)
- // drop references to scheduler stuff to aid garbage collection...
- qs = null;
- qsRsrcs = null;
- }
2.4.1 获取 trigger(红色 1)
- protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
- throws JobPersistenceException {
- if (timeWindow <0) {
- throw new IllegalArgumentException();
- }
- List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
- Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
- final int MAX_DO_LOOP_RETRY = 3;
- int currentLoopCount = 0;
- do {
- currentLoopCount ++;
- try {
- List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
- // No trigger is ready to fire yet.
- if (keys == null || keys.size() == 0)
- return acquiredTriggers;
- long batchEnd = noLaterThan;
- for(TriggerKey triggerKey: keys) {
- // If our trigger is no longer available, try a new one.
- OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
- if(nextTrigger == null) {
- continue; // next trigger
- }
- // If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
- // put it back into the timeTriggers set and continue to search for next trigger.
- JobKey jobKey = nextTrigger.getJobKey();
- JobDetail job;
- try {
- job = retrieveJob(conn, jobKey);
- } catch (JobPersistenceException jpe) {
- try {
- getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
- getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR);
- } catch (SQLException sqle) {
- getLog().error("Unable to set trigger state to ERROR.", sqle);
- }
- continue;
- }
- if (job.isConcurrentExectionDisallowed()) {
- if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
- continue; // next trigger
- } else {
- acquiredJobKeysForNoConcurrentExec.add(jobKey);
- }
- }
- if (nextTrigger.getNextFireTime().getTime()> batchEnd) {
- break;
- }
- // We now have a acquired trigger, let's add to return list.
- // If our trigger was no longer in the expected state, try a new one.
- int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
- if (rowsUpdated <= 0) {
- continue; // next trigger
- }
- nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
- getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
- if(acquiredTriggers.isEmpty()) {
- batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;
- }
- acquiredTriggers.add(nextTrigger);
- }
- // if we didn't end up with any trigger to fire from that first
- // batch, try again for another batch. We allow with a max retry count.
- if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
- continue;
- }
- // We are done with the while loop.
- break;
- } catch (Exception e) {
- throw new JobPersistenceException(
- "Couldn't acquire next trigger: " + e.getMessage(), e);
- }
- } while (true);
- // Return the acquired trigger list
- return acquiredTriggers;
- }
2.4.2 触发 trigger(红色 2)
- protected TriggerFiredBundle triggerFired(Connection conn,
- OperableTrigger trigger)
- throws JobPersistenceException {
- JobDetail job;
- Calendar cal = null;
- // Make sure trigger wasn't deleted, paused, or completed...
- try { // if trigger was deleted, state will be STATE_DELETED
- String state = getDelegate().selectTriggerState(conn,
- trigger.getKey());
- if (!state.equals(STATE_ACQUIRED)) {
- return null;
- }
- } catch (SQLException e) {
- throw new JobPersistenceException("Couldn't select trigger state: "
- + e.getMessage(), e);
- }
- try {
- job = retrieveJob(conn, trigger.getJobKey());
- if (job == null) { return null; }
- } catch (JobPersistenceException jpe) {
- try {
- getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
- getDelegate().updateTriggerState(conn, trigger.getKey(),
- STATE_ERROR);
- } catch (SQLException sqle) {
- getLog().error("Unable to set trigger state to ERROR.", sqle);
- }
- throw jpe;
- }
- if (trigger.getCalendarName() != null) {
- cal = retrieveCalendar(conn, trigger.getCalendarName());
- if (cal == null) { return null; }
- }
- try {
- getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
- } catch (SQLException e) {
- throw new JobPersistenceException("Couldn't insert fired trigger: "
- + e.getMessage(), e);
- }
- Date prevFireTime = trigger.getPreviousFireTime();
- // call triggered - to update the trigger's next-fire-time state...
- trigger.triggered(cal);
- String state = STATE_WAITING;
- boolean force = true;
- if (job.isConcurrentExectionDisallowed()) {
- state = STATE_BLOCKED;
- force = false;
- try {
- getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
- STATE_BLOCKED, STATE_WAITING);
- getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
- STATE_BLOCKED, STATE_ACQUIRED);
- getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
- STATE_PAUSED_BLOCKED, STATE_PAUSED);
- } catch (SQLException e) {
- throw new JobPersistenceException(
- "Couldn't update states of blocked triggers: "
- + e.getMessage(), e);
- }
- }
- if (trigger.getNextFireTime() == null) {
- state = STATE_COMPLETE;
- force = true;
- }
- storeTrigger(conn, trigger, job, true, state, force, false);
- job.getJobDataMap().clearDirtyFlag();
- return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()
- .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger
- .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
- }
2.4.3 数据库锁
StdRowLockSemaphore 针对支持 select for update 的数据库如 MySQL
UpdateLockRowSemaphore 针对不支持 select for update 的数据库如 mssqlserver
StdRowLockSemaphore 的实现如下:
- public static final String SELECT_FOR_LOCK = "SELECT * FROM"
- + TABLE_PREFIX_SUBST + TABLE_LOCKS + "WHERE" + COL_SCHEDULER_NAME + "=" + SCHED_NAME_SUBST
- + "AND" + COL_LOCK_NAME + "= ? FOR UPDATE";
- public static final String INSERT_LOCK = "INSERT INTO"
- + TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + "," + COL_LOCK_NAME + ") VALUES ("
- + SCHED_NAME_SUBST + ", ?)";
总结:
1.quartz 的三大组件 Job/trigger/scheduler,job 负责业务逻辑, trigger 负责执行时机, scheduler 负责调度 Job 和 trigger 来执行.
2. 使用 MySQL 作为存储的话, 使用 StdJDBCDelegate 和数据库进行交互, 交互的 sql 在 StdJDBCConstants 中定义
3.QuartzScheduler 是核心类, Scheduler 做其代理, 真正执行的是 QuartzSchedulerThread
4.JobStore 存储控制, JobStoreSupport 的两个实现 JobStoreCMT 容器管理事务, 不需要使用 commit 和 rollback;JobStoreTX 用在单机环境, 需要处理 commit 和 rollback
5. 数据库锁使用了悲观锁 select for update, 定义为 Semaphore
6.qrtz_scheduler_state 定义了扫描间隔集群扫描间隔
参考文献:
[1] https://www.baeldung.com/spring-quartz-schedule
[2] https://blog.csdn.net/xiaojin21cen/article/details/79298883
来源: https://www.cnblogs.com/davidwang456/p/10329616.html