(手机横屏看源码更方便)
注: java 源码分析部分如无特殊说明均基于 java8 版本.
简介
Java 的线程池是块硬骨头, 对线程池的源码做深入研究不仅能提高对 Java 整个并发编程的理解, 也能提高自己在面试中的表现, 增加被录取的可能性.
本系列将分成很多个章节, 本章作为线程池的第一章将对整个线程池体系做一个总览.
体系结构
上图列举了线程池中非常重要的接口和类:
(1)Executor, 线程池顶级接口;
(2)ExecutorService, 线程池次级接口, 对 Executor 做了一些扩展, 增加一些功能;
(3)ScheduledExecutorService, 对 ExecutorService 做了一些扩展, 增加一些定时任务相关的功能;
(4)AbstractExecutorService, 抽象类, 运用模板方法设计模式实现了一部分方法;
(5)ThreadPoolExecutor, 普通线程池类, 这也是我们通常所说的线程池, 包含最基本的一些线程池操作相关的方法实现;
(6)ScheduledThreadPoolExecutor, 定时任务线程池类, 用于实现定时任务相关功能;
(7)ForkJoinPool, 新型线程池类, java7 中新增的线程池类, 基于工作窃取理论实现, 运用于大任务拆小任务, 任务无限多的场景;
(8)Executors, 线程池工具类, 定义了一些快速实现线程池的方法(谨慎使用);
Executor
线程池顶级接口, 只定义了一个执行无返回值任务的方法.
- public interface Executor {
- // 执行无返回值任务[本篇文章由公众号 "彤哥读源码" 原创]
- void execute(Runnable command);
- }
- ExecutorService
线程池次级接口, 对 Executor 做了一些扩展, 主要增加了关闭线程池, 执行有返回值任务, 批量执行任务的方法.
- public interface ExecutorService extends Executor {
- // 关闭线程池, 不再接受新任务, 但已经提交的任务会执行完成
- void shutdown();
- // 立即关闭线程池, 尝试停止正在运行的任务, 未执行的任务将不再执行
- // 被迫停止及未执行的任务将以列表的形式返回
- List<Runnable> shutdownNow();
- // 检查线程池是否已关闭
- boolean isShutdown();
- // 检查线程池是否已终止, 只有在 shutdown()或 shutdownNow()之后调用才有可能为 true
- boolean isTerminated();
- // 在指定时间内线程池达到终止状态了才会返回 true
- boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException;
- // 执行有返回值的任务, 任务的返回值为 task.call()的结果
- <T> Future<T> submit(Callable<T> task);
- // 执行有返回值的任务, 任务的返回值为这里传入的 result
- // 当然只有当任务执行完成了调用 get()时才会返回
- <T> Future<T> submit(Runnable task, T result);
- // 执行有返回值的任务, 任务的返回值为 null
- // 当然只有当任务执行完成了调用 get()时才会返回
- Future<?> submit(Runnable task);
- // 批量执行任务, 只有当这些任务都完成了这个方法才会返回
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException;
- // 在指定时间内批量执行任务, 未执行完成的任务将被取消
- // 这里的 timeout 是所有任务的总时间, 不是单个任务的时间
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException;
- // 返回任意一个已完成任务的执行结果, 未执行完成的任务将被取消
- <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException;
- // 在指定时间内如果有任务已完成, 则返回任意一个已完成任务的执行结果, 未执行完成的任务将被取消
- <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }
- ScheduledExecutorService
对 ExecutorService 做了一些扩展, 增加一些定时任务相关的功能, 主要包含两大类: 执行一次, 重复多次执行.
- public interface ScheduledExecutorService extends ExecutorService {
- // 在指定延时后执行一次
- public ScheduledFuture<?> schedule(Runnable command,
- long delay, TimeUnit unit);
- // 在指定延时后执行一次
- public <V> ScheduledFuture<V> schedule(Callable<V> callable,
- long delay, TimeUnit unit);
- // 在指定延时后开始执行, 并在之后以指定时间间隔重复执行(间隔不包含任务执行的时间)
- // 相当于之后的延时以任务开始计算[本篇文章由公众号 "彤哥读源码" 原创]
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
- long initialDelay,
- long period,
- TimeUnit unit);
- // 在指定延时后开始执行, 并在之后以指定延时重复执行(间隔包含任务执行的时间)
- // 相当于之后的延时以任务结束计算
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
- long initialDelay,
- long delay,
- TimeUnit unit);
- }
- AbstractExecutorService
抽象类, 运用模板方法设计模式实现了一部分方法, 主要为执行有返回值任务, 批量执行任务的方法.
- public abstract class AbstractExecutorService implements ExecutorService {
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- return new FutureTask<T>(runnable, value);
- }
- protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- return new FutureTask<T>(callable);
- }
- public Future<?> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<Void> ftask = newTaskFor(task, null);
- execute(ftask);
- return ftask;
- }
- public <T> Future<T> submit(Runnable task, T result) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task, result);
- execute(ftask);
- return ftask;
- }
- public <T> Future<T> submit(Callable<T> task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task);
- execute(ftask);
- return ftask;
- }
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException {
- // 略...
- }
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- // 略...
- }
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
- // 略...
- }
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException {
- // 略...
- }
- }
可以看到, 这里的 submit()方法对传入的任务都包装成了 FutureTask 来进行处理, 这是什么东西呢? 欢迎关注后面的章节.
ThreadPoolExecutor
普通线程池类, 这也是我们通常所说的线程池, 包含最基本的一些线程池操作相关的方法实现.
线程池的主要实现逻辑都在这里面, 比如线程的创建, 任务的处理, 拒绝策略等, 我们后面单独分析这个类.
ScheduledThreadPoolExecutor
定时任务线程池类, 用于实现定时任务相关功能, 将任务包装成定时任务, 并按照定时策略来执行, 我们后面单独分析这个类.
问题: 你知道定时任务线程池类使用的是什么队列吗?
ForkJoinPool
新型线程池类, java7 中新增的线程池类, 这个线程池与 Go 中的线程模型特别类似, 都是基于工作窃取理论, 特别适合于处理归并排序这种先分后合的场景.
Executors
线程池工具类, 定义了一系列快速实现线程池的方法 --newXXX(), 不过阿里手册是不建议使用这个类来新建线程池的, 彤哥我并不这么认为, 只要能掌握其源码, 知道其利敝偶尔还是可以用的, 后面我们再来说这个事.
彩蛋
无彩蛋不欢, 今天的问题是定时任务线程池用的是哪种队列来实现的?
答: 延时队列. 定时任务线程池中并没有直接使用并发集合中的 DelayQueue, 而是自己又实现了一个 DelayedWorkQueue, 不过跟 DelayQueue 的实现原理是一样的.
延时队列使用什么数据结构来实现的呢?
答: 堆(DelayQueue 中使用的是优先级队列, 而优先级队列使用的堆; DelayedWorkQueue 直接使用的堆).
关于延时队列, 优先级队列和堆的相关内容点击下面的链接直达:
死磕 java 集合之 DelayQueue 源码分析 https://mp.weixin.qq.com/s/IOTwwgaOdMpZl-6QM0HlVQ
死磕 java 集合之 PriorityQueue 源码分析 https://mp.weixin.qq.com/s/kGKS7WXWbf-ME1_Hr3Fpgw
拜托, 面试别再问我堆 (排序) 了! https://mp.weixin.qq.com/s/AF2tMHfofG8b51yIyaIReg
来源: https://www.cnblogs.com/tong-yuan/p/11674974.html