文本将主要讲述 ThreadPoolExecutor 一个特殊的子类 ScheduledThreadPoolExecutor, 主要用于执行周期性任务; 所以在看本文之前最好先了解一下 ThreadPoolExecutor , 可以参考 ThreadPoolExecutor 详解; 另外 ScheduledThreadPoolExecutor 中使用了延迟队列, 主要是基于完全二叉堆实现的, 可以参考 完全二叉堆;
一, ScheduledThreadPoolExecutor 结构概述
1. 继承关系
- public class ScheduledThreadPoolExecutor
- extends ThreadPoolExecutor implements ScheduledExecutorService {}
在源码中可以看到, ScheduledThreadPoolExecutor 的状态管理, 入队操作, 拒绝操作等都是继承于 ThreadPoolExecutor;ScheduledThreadPoolExecutor 主要是提供了周期任务和延迟任务相关的操作;
- schedule(Runnable command, long delay, TimeUnit unit) // 无返回值的延迟任务
- schedule(Callable callable, long delay, TimeUnit unit) // 有返回值的延迟任务
- scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // 固定频率周期任务
- scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) // 固定延迟周期任务
就 ScheduledThreadPoolExecutor 的运行逻辑而言, 大致可以表述为:
首先将 Runnable/Callable 封装为 ScheduledFutureTask, 延迟时间作为比较属性;
然后加入 DelayedWorkQueue 队列中, 每次取出队首延迟最小的任务, 超时等待, 然后执行;
最后判断是否为周期任务, 然后将其重新加入 DelayedWorkQueue 队列中;
其内部结构如图所示:
这里需要注意的:
ScheduledThreadPoolExecutor 中的队列不能指定, 只能是 DelayedWorkQueue; 因为他是 无界队列, 所以再添加任务的时候线程最多可以增加到 coreSize, 这里不清楚的可以查看 ThreadPoolExecutor 详解 , 就不再重复了;
ScheduledThreadPoolExecutor 重写了 ThreadPoolExecutor 的 execute() 方法, 其执行的核心方法变成 delayedExecute();
- 2. ScheduledFutureTask
- private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
- private final long sequenceNumber; // 任务序号, 从 AtomicLong sequencer 获取, 当延迟时间相同时, 序号小的先出
- private long time; // 下次任务执行时间
- private final long period; // 0 表示非周期任务, 正值表示固定频率周期任务, 负值表示固定延迟周期任务
- RunnableScheduledFuture<V> outerTask = this; // 重复执行的任务, 传入的任务可以使用 decorateTask() 重新包装
- int heapIndex; // 队列索引
- }
其中最重要的方法必然是 run 方法了:
- public void run() {
- boolean periodic = isPeriodic(); // 是否为周期任务, period != 0
- if (!canRunInCurrentRunState(periodic)) // 当前状态能否继续运行, 详细测试后面还会讲到
- cancel(false); // 取消任务
- else if (!periodic) // 不是周期任务时, 直接运行
- ScheduledFutureTask.super.run();
- else if (ScheduledFutureTask.super.runAndReset()) { // 时周期任务
- setNextRunTime(); // 设置下次执行时间
- reExecutePeriodic(outerTask); // 重新入队
- }
- }
- public boolean cancel(boolean mayInterruptIfRunning) {
- boolean cancelled = super.cancel(mayInterruptIfRunning); // 设置中断状态
- if (cancelled && removeOnCancel && heapIndex>= 0) // 当设置 removeOnCancel 状态时, 移除任务
- remove(this); // 默认为 false
- return cancelled;
- }
- void reExecutePeriodic(RunnableScheduledFuture<?> task) {
- if (canRunInCurrentRunState(true)) { // 如果当前状态可以执行
- super.getQueue().add(task); // 则重新入队
- if (!canRunInCurrentRunState(true) && remove(task))
- task.cancel(false);
- else ensurePrestart(); // 确保有线程执行任务
- }
- }
此外还有 DelayedWorkQueue, 但是这里不准备讲了, 可以查看 完全二叉堆 了解实现的原理;
二, scheduleAtFixedRate 与 scheduleWithFixedDelay
scheduleAtFixedRate 和 scheduleWithFixedDelay 是我们最常用的两个方法, 但是他们的区别可能不是很清楚, 这里重点讲一下,
- 1. scheduleAtFixedRate
- // 测试
- ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
- pool.scheduleAtFixedRate(() -> {
- sleep(1000); // 睡眠 1s,
- log.info("run task");
- }, 1, 2, TimeUnit.SECONDS); // 延迟 1s, 周期 2s
- // 打印
- [19:41:28,489 INFO ] [pool-1-thread-1] - run task
- [19:41:30,482 INFO ] [pool-1-thread-1] - run task
- [19:41:32,483 INFO ] [pool-1-thread-1] - run task
- [19:41:34,480 INFO ] [pool-1-thread-1] - run task
可以看到的确时固定周期 2s 执行的, 但是如果任务执行时间超过周期呢?
- // 测试
- ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
- pool.scheduleAtFixedRate(() -> {
- int i = 2000 + random.nextInt(3) * 1000;
- sleep(i);
- log.info("run task, sleep :{}", i);
- }, 1, 2, TimeUnit.SECONDS); // 延迟 1s, 周期 2s
- // 打印
- [19:42:53,428 INFO ] [pool-1-thread-1] - run task, sleep :2000
- [19:42:55,430 INFO ] [pool-1-thread-1] - run task, sleep :2000
- [19:42:59,430 INFO ] [pool-1-thread-1] - run task, sleep :4000
- [19:43:02,434 INFO ] [pool-1-thread-1] - run task, sleep :3000
- [19:43:06,436 INFO ] [pool-1-thread-1] - run task, sleep :4000
可以看到如果任务执行时间超出周期时, 下一次任务会立刻运行; 就好像周期是一个有弹性的袋子, 能装下运行时间的时候, 是固定大小, 装不下的时候就会被撑大, 图像化表示如下:
- 2. scheduleWithFixedDelay
- // 测试
- ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
- pool.scheduleAtFixedRate(() -> {
- int i = 1000 + random.nextInt(5) * 1000;
- sleep(i);
- log.info("run task, sleep :{}", i);
- }, 1, 2, TimeUnit.SECONDS); // 延迟 1s, 周期 2s
- // 打印
- [20:05:40,682 INFO ] [pool-1-thread-1] - run task, sleep :1000
- [20:05:45,686 INFO ] [pool-1-thread-1] - run task, sleep :3000
- [20:05:49,689 INFO ] [pool-1-thread-1] - run task, sleep :2000
- [20:05:55,690 INFO ] [pool-1-thread-1] - run task, sleep :4000
- [20:06:01,692 INFO ] [pool-1-thread-1] - run task, sleep :4000
可以看到无论执行时间是多少, 其结果都是在执行完毕后, 停顿固定的时间, 然后执行下一次任务, 其图形化表示为:
三, 源码分析
1. 延迟任务
- public void execute(Runnable command) {
- schedule(command, 0, NANOSECONDS);
- }
- public <T> Future<T> submit(Callable<T> task) {
- return schedule(task, 0, NANOSECONDS);
- }
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
- if (command == null || unit == null) throw new NullPointerException();
- RunnableScheduledFuture<?> t = decorateTask(
- command,new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
- delayedExecute(t);
- return t;
- }
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
- if (callable == null || unit == null) throw new NullPointerException();
- RunnableScheduledFuture<V> t = decorateTask(
- callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
- delayedExecute(t);
- return t;
- }
可以看到所有的周期任务, 最终执行的都是 delayedExecute 方法, 其中 decorateTask 是一个钩子函数, 其之类可以利用他对任务进行重构过滤等操作;
- private void delayedExecute(RunnableScheduledFuture<?> task) {
- if (isShutdown()) reject(task); // 如果线程池已经关闭, 则拒绝任务
- else {
- super.getQueue().add(task); // 任务入队
- if (isShutdown() && // 再次检查, 线程池是否关闭
- !canRunInCurrentRunState(task.isPeriodic()) &&
- remove(task))
- task.cancel(false);
- else
- ensurePrestart(); // 确保有线程执行任务
- }
- }
2. 周期任务
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
- long period, TimeUnit unit) {
- if (command == null || unit == null) throw new NullPointerException();
- if (period <= 0) throw new IllegalArgumentException();
- ScheduledFutureTask<Void> sft =
- new ScheduledFutureTask<Void>(command,
- null,
- triggerTime(initialDelay, unit),
- unit.toNanos(period)); // 注意这里添加的是正值
- RunnableScheduledFuture<Void> t = decorateTask(command, sft);
- sft.outerTask = t;
- delayedExecute(t);
- return t;
- }
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
- long delay, TimeUnit unit) {
- if (command == null || unit == null) throw new NullPointerException();
- if (delay <= 0) throw new IllegalArgumentException();
- ScheduledFutureTask<Void> sft =
- new ScheduledFutureTask<Void>(command,
- null,
- triggerTime(initialDelay, unit),
- unit.toNanos(-delay)); // 注意这里添加的是负值
- RunnableScheduledFuture<Void> t = decorateTask(command, sft);
- sft.outerTask = t;
- delayedExecute(t);
- return t;
- }
从上面代码可以看到 scheduleAtFixedRate 和 scheduleWithFixedDelay 只有周期任务的时间不同, 其他的都一样, 那么下面我们看一下他们的任务时间计算;
- public long getDelay(TimeUnit unit) {
- return unit.convert(time - now(), NANOSECONDS);
- }
- private void setNextRunTime() {
- long p = period;
- if (p> 0) // 正值表示 scheduleAtFixedRate
- time += p; // 不管任务执行时间, 直接加上周期时间, 也就是一次任务超时, 会影响后续任务的执行,
- // 超时的时候, getDelay 是负值, 所以在延迟队列中必然排在最前面, 立刻被取出执行
- else
- time = triggerTime(-p); // 计算触发时间
- }
- long triggerTime(long delay) { // 这里可以看到, 每次的确是在当前时间的基础上, 加上延迟时间;
- return now() + ((delay <(Long.MAX_VALUE>> 1)) ? delay : overflowFree(delay));
- }
这里特别要注意 scheduleAtFixedRate 一次任务超时, 会持续影响后面的任务周期安排, 所以在设定周期的时候要特别注意; 例如:
- // 测试
- ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
- pool.scheduleAtFixedRate(() -> {
- int i = random.nextInt(5) * 1000;
- sleep(i);
- log.info("run task, sleep :{}", i);
- }, 1, 2, TimeUnit.SECONDS);
- // 打印
- [20:29:11,310 INFO ] [pool-1-thread-1] - run task, sleep :1000
- [20:29:16,304 INFO ] [pool-1-thread-1] - run task, sleep :4000
- [20:29:19,304 INFO ] [pool-1-thread-1] - run task, sleep :3000
- [20:29:21,305 INFO ] [pool-1-thread-1] - run task, sleep :2000
- [20:29:22,305 INFO ] [pool-1-thread-1] - run task, sleep :1000
- [20:29:23,306 INFO ] [pool-1-thread-1] - run task, sleep :1000
- [20:29:27,306 INFO ] [pool-1-thread-1] - run task, sleep :4000
- [20:29:30,307 INFO ] [pool-1-thread-1] - run task, sleep :3000
如图所示:
3. 取消任务
- private volatile boolean continueExistingPeriodicTasksAfterShutdown; // 关闭后继续执行周期任务, 默认 false
- private volatile boolean executeExistingDelayedTasksAfterShutdown = true; // 关闭后继续执行延迟任务, 默认 true
- private volatile boolean removeOnCancel = false; // 取消任务是, 从队列中删除任务, 默认 false
- @Override void onShutdown() {
- BlockingQueue<Runnable> q = super.getQueue();
- boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); // 继续延迟任务
- boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); // 继续周期任务
- if (!keepDelayed && !keepPeriodic) { // 都是 false, 直接清除
- for (Object e : q.toArray())
- if (e instanceof RunnableScheduledFuture<?>)
- ((RunnableScheduledFuture<?>) e).cancel(false);
- q.clear();
- }
- else {
- // Traverse snapshot to avoid iterator exceptions
- for (Object e : q.toArray()) {
- if (e instanceof RunnableScheduledFuture) {
- RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
- if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
- t.isCancelled()) { // also remove if already cancelled
- if (q.remove(t))
- t.cancel(false);
- }
- }
- }
- }
- tryTerminate();
- }
总结
scheduleAtFixedRate, 固定频率周期任务, 注意一次任务超时, 会持续的影响后续的任务周期;
scheduleWithFixedDelay, 固定延迟周期任务, 即每次任务结束后, 超时等待固定时间;
此外 ScheduledThreadPoolExecutor 线程最多为核心线程, 最大线程数不起作用, 因为 DelayedWorkQueue 是无界队列;
来源: https://www.cnblogs.com/sanzao/p/10760641.html