(手机横屏看源码更方便)
注: java 源码分析部分如无特殊说明均基于 java8 版本.
注: 本文基于 ScheduledThreadPoolExecutor 定时线程池类.
简介
前面我们一起学习了普通任务, 未来任务的执行流程, 今天我们再来学习一种新的任务 -- 定时任务.
定时任务是我们经常会用到的一种任务, 它表示在未来某个时刻执行, 或者未来按照某种规则重复执行的任务.
问题
(1)如何保证任务是在未来某个时刻才被执行?
(2)如何保证任务按照某种规则重复执行?
来个栗子
创建一个定时线程池, 用它来跑四种不同的定时任务.
- public class ThreadPoolTest03 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- // 创建一个定时线程池
- ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5);
- System.out.println("start:" + System.currentTimeMillis());
- // 执行一个无返回值任务, 5 秒后执行, 只执行一次
- scheduledThreadPoolExecutor.schedule(() -> {
- System.out.println("spring:" + System.currentTimeMillis());
- }, 5, TimeUnit.SECONDS);
- // 执行一个有返回值任务, 5 秒后执行, 只执行一次
- ScheduledFuture<String> future = scheduledThreadPoolExecutor.schedule(() -> {
- System.out.println("inner summer:" + System.currentTimeMillis());
- return "outer summer:";
- }, 5, TimeUnit.SECONDS);
- // 获取返回值
- System.out.println(future.get() + System.currentTimeMillis());
- // 按固定频率执行一个任务, 每 2 秒执行一次, 1 秒后执行
- // 任务开始时的 2 秒后
- scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
- System.out.println("autumn:" + System.currentTimeMillis());
- LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
- }, 1, 2, TimeUnit.SECONDS);
- // 按固定延时执行一个任务, 每延时 2 秒执行一次, 1 秒执行
- // 任务结束时的 2 秒后, 本文由公从号 "彤哥读源码" 原创
- scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
- System.out.println("winter:" + System.currentTimeMillis());
- LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
- }, 1, 2, TimeUnit.SECONDS);
- }
- }
定时任务总体分为四种:
(1)未来执行一次的任务, 无返回值;
(2)未来执行一次的任务, 有返回值;
(3)未来按固定频率重复执行的任务;
(4)未来按固定延时重复执行的任务;
本文主要以第三种为例进行源码解析.
scheduleAtFixedRate()方法
提交一个按固定频率执行的任务.
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
- long initialDelay,
- long period,
- TimeUnit unit) {
- // 参数判断
- if (command == null || unit == null)
- throw new NullPointerException();
- if (period <= 0)
- throw new IllegalArgumentException();
- // 将普通任务装饰成 ScheduledFutureTask
- ScheduledFutureTask<Void> sft =
- new ScheduledFutureTask<Void>(command,
- null,
- triggerTime(initialDelay, unit),
- unit.toNanos(period));
- // 钩子方法, 给子类用来替换装饰 task, 这里认为 t==sft
- RunnableScheduledFuture<Void> t = decorateTask(command, sft);
- sft.outerTask = t;
- // 延时执行
- delayedExecute(t);
- return t;
- }
可以看到, 这里的处理跟未来任务类似, 都是装饰成另一个任务, 再拿去执行, 不同的是这里交给了 delayedExecute()方法去执行, 这个方法是干嘛的呢?
delayedExecute()方法
延时执行.
- private void delayedExecute(RunnableScheduledFuture<?> task) {
- // 如果线程池关闭了, 执行拒绝策略
- if (isShutdown())
- reject(task);
- else {
- // 先把任务扔到队列中去
- super.getQueue().add(task);
- // 再次检查线程池状态
- if (isShutdown() &&
- !canRunInCurrentRunState(task.isPeriodic()) &&
- remove(task))
- task.cancel(false);
- else
- // 保证有足够有线程执行任务
- ensurePrestart();
- }
- }
- void ensurePrestart() {
- int wc = workerCountOf(ctl.get());
- // 创建工作线程
- // 注意, 这里没有传入 firstTask 参数, 因为上面先把任务扔到队列中去了
- // 另外, 没用上 maxPoolSize 参数, 所以最大线程数量在定时线程池中实际是没有用的
- if (wc <corePoolSize)
- addWorker(null, true);
- else if (wc == 0)
- addWorker(null, false);
- }
到这里就结束了?!
实际上, 这里只是控制任务能不能被执行, 真正执行任务的地方在任务的 run()方法中.
还记得上面的任务被装饰成了 ScheduledFutureTask 类的实例吗? 所以, 我们只要看 ScheduledFutureTask 的 run()方法就可以了.
ScheduledFutureTask 类的 run()方法
定时任务执行的地方.
- public void run() {
- // 是否重复执行
- boolean periodic = isPeriodic();
- // 线程池状态判断
- if (!canRunInCurrentRunState(periodic))
- cancel(false);
- // 一次性任务, 直接调用父类的 run()方法, 这个父类实际上是 FutureTask
- // 这里我们不再讲解, 有兴趣的同学看看上一章的内容
- else if (!periodic)
- ScheduledFutureTask.super.run();
- // 重复性任务, 先调用父类的 runAndReset()方法, 这个父类也是 FutureTask
- // 本文主要分析下面的部分
- else if (ScheduledFutureTask.super.runAndReset()) {
- // 设置下次执行的时间
- setNextRunTime();
- // 重复执行, 本文由公从号 "彤哥读源码" 原创
- reExecutePeriodic(outerTask);
- }
- }
可以看到, 对于重复性任务, 先调用 FutureTask 的 runAndReset()方法, 再设置下次执行的时间, 最后再调用 reExecutePeriodic()方法.
FutureTask 的 runAndReset()方法与 run()方法类似, 只是其任务运行完毕后不会把状态修改为 NORMAL, 有兴趣的同学点进源码看看.
再来看看 reExecutePeriodic()方法.
- void reExecutePeriodic(RunnableScheduledFuture<?> task) {
- // 线程池状态检查
- if (canRunInCurrentRunState(true)) {
- // 再次把任务扔到任务队列中
- super.getQueue().add(task);
- // 再次检查线程池状态
- if (!canRunInCurrentRunState(true) && remove(task))
- task.cancel(false);
- else
- // 保证工作线程足够
- ensurePrestart();
- }
- }
到这里是不是豁然开朗了, 原来定时线程池执行重复任务是在任务执行完毕后, 又把任务扔回了任务队列中.
重复性的问题解决了, 那么, 它是怎么控制任务在某个时刻执行的呢?
OK, 这就轮到我们的延时队列登场了.
DelayedWorkQueue 内部类
我们知道, 线程池执行任务时需要从任务队列中拿任务, 而普通的任务队列, 如果里面有任务就直接拿出来了, 但是延时队列不一样, 它里面的任务, 如果没有到时间也是拿不出来的, 这也是前面分析中一上来就把任务扔进队列且创建 Worker 没有传入 firstTask 的原因.
说了这么多, 它到底是怎么实现的呢?
其实, 延时队列我们在前面都详细分析过, 想看完整源码分析的可以看看之前的《死磕 java 集合之 DelayQueue 源码分析》.
延时队列内部是使用 "堆" 这种数据结构来实现的, 有兴趣的同学可以看看之前的《拜托, 面试别再问我堆 (排序) 了!》.
我们这里只拿一个 take()方法出来分析.
- public RunnableScheduledFuture<?> take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- // 加锁
- lock.lockInterruptibly();
- try {
- for (;;) {
- // 堆顶任务
- RunnableScheduledFuture<?> first = queue[0];
- // 如果队列为空, 则等待
- if (first == null)
- available.await();
- else {
- // 还有多久到时间
- long delay = first.getDelay(NANOSECONDS);
- // 如果小于等于 0, 说明这个任务到时间了, 可以从队列中出队了
- if (delay <= 0)
- // 出队, 然后堆化
- return finishPoll(first);
- // 还没到时间
- first = null;
- // 如果前面有线程在等待, 直接进入等待
- if (leader != null)
- available.await();
- else {
- // 当前线程作为 leader
- Thread thisThread = Thread.currentThread();
- leader = thisThread;
- try {
- // 等待上面计算的延时时间, 再自动唤醒
- available.awaitNanos(delay);
- } finally {
- // 唤醒后再次获得锁后把 leader 再置空
- if (leader == thisThread)
- leader = null;
- }
- }
- }
- }
- } finally {
- if (leader == null && queue[0] != null)
- // 相当于唤醒下一个等待的任务
- available.signal();
- // 解锁, 本文由公从号 "彤哥读源码" 原创
- lock.unlock();
- }
- }
大致的原理是, 利用堆的特性获取最快到时间的任务, 即堆顶的任务:
(1)如果堆顶的任务到时间了, 就让它从队列中了队;
(2)如果堆顶的任务还没到时间, 就看它还有多久到时间, 利用条件锁等待这段时间, 待时间到了后重新走 (1) 的判断;
这样就解决了可以在指定时间后执行任务.
其它
其实, ScheduledThreadPoolExecutor 也是可以使用 execute()或者 submit()提交任务的, 只不过它们会被当成 0 延时的任务来执行一次.
- public void execute(Runnable command) {
- schedule(command, 0, NANOSECONDS);
- }
- public <T> Future<T> submit(Callable<T> task) {
- return schedule(task, 0, NANOSECONDS);
- }
总结
实现定时任务有两个问题要解决, 分别是指定未来某个时刻执行任务, 重复执行.
(1)指定某个时刻执行任务, 是通过延时队列的特性来解决的;
(2)重复执行, 是通过在任务执行后再次把任务加入到队列中来解决的.
彩蛋
到这里基本上普通的线程池的源码解析就结束了, 这种线程池是比较经典的实现方式, 整体上来说, 效率相对不是特别高, 因为所有的工作线程共用同一个队列, 每次从队列中取任务都要加锁解锁操作.
那么, 能不能给每个工作线程配备一个任务队列呢, 在提交任务的时候就把任务分配给指定的工作线程, 这样在取任务的时候就不需要频繁的加锁解锁了.
答案是肯定的, 下一章我们一起来看看这种基于 "工作窃取" 理论的线程池 --ForkJoinPool.
来源: https://www.cnblogs.com/tong-yuan/p/11801757.html