一. 使用线程池的好处
与 "为每一个任务分配一个线程" 相比, 线程池有一些好处.
重用已经创建的线程, 减少了创建, 销毁线程的开销.
任务到达时, 可能线程池中已经有创建好的线程供使用了, 避免了等待线程创建的时间开销.
二. Java 线程池实现原理
在 Java 中创建线程池可以使用 Executors 提供的四个静态方法创建适用于特定情况的几种线程池. 但这些构造方法还是根据需求直接传入特定参数实例化了 ThreadPoolExecutor 类. 所以, 我们要想从原理上理解线程池, 还是要先学习一下 ThreadPoolExecutor 的构造方法, 看看都有哪些参数. 这些参数其实可以理解为线程池的配置信息, 根据自己的需求传入不同的参数就能构造出不同的线程池.
ThreadPollExector 类有 4 个构造方法, 其它三个的参数较少, 使用了一些但归根结底是传入的 7 个关键参数决定了这个线程池是什么样的. 所以关键是这七个参数.
2.1 ThreadPoolExecutor 构造方法的 7 个参数
方法声明如下:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler);
- corePoolSize :
核心线程数. 当线程池刚刚创建时线程数是 0, 这时如果每来一个任务就会创建一个新的线程, 直到已创建的线程数等于核心线程数后就不再创建了, 而是把新任务放入阻塞队里去.
关于初始线程数: 如果调用了 prestartAllCoreThreads() 会直接创建出 corePoolSize 个线程; 如果调用了 prestartCoreThread() 则会提前创建出一个线程.
关于线程存活时间: 核心线程不受 keepAliveTime 的影响, 创建后会一直存在, 直到线程池关闭. 除非调用了 allowCoreThreadTimeOut(true) 方法
这里的疑问是, 如果现有线程是有空闲的, 但没达到核心线程数, 来了新任务会创建新的线程吗?
答: 这个疑问如果理解了线程池的工作过程就不会问了. 详见下面的线程池工作流程.
maximumPoolSize
最大线程数. 是线程池最多允许存在的线程总数. 如果当前线程数已经达到 corePoolSize. 那么就将任务放入队列, 如果队列也满了. 就判断一下当前存在线程数是否小于 maximumPoolSize, 如果是, 则创建新的线程执行任务. 这里创建的线程处于核心线程池外, 受 keepAliveTime 的影响, 如果空闲到达执行时间就会销毁.
*keepAliveTime
控制非核心线程在空闲状态下的存活时间. 如果调用 allowCoreThreadTimedOut 方法, 核心线程也可以受它的影响.
unit
keepAliveTime 的时间单位, 有秒, 毫秒, 分钟, 小时, 天等.
workQueue
指定缓存队列
threadFactory
线程工厂, 用来创建线程.
handler
表示当前拒绝处理任务时的策略, 当缓存队列已满, 线程数也达到 maximumPoolSize 时就按指定的策略处理新提交的任务.
主要有以下四种取值:
ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出 RejectedExecutionException 异常.
ThreadPoolExecutor.DiscardPolicy: 也是丢弃任务, 但是不抛出异常.
ThreadPoolExecutor.DiscardOldestPolicy: 丢弃队列最前面的任务, 然后重新尝试执行任务 (重复此过程)
ThreadPoolExecutor.CallerRunsPolicy: 由调用线程处理该任务
2.2 线程池工作流程
当有新任务提交给线程池时, 首先检查一下当前线程数是否达到了 corePoolSize, 如果没达到则新创建线程, 将当前任务作为该线程的第一个任务执行.
如果已经到达了 corePoolSize 或超过了, 则将任务放入缓存队列.
如果缓存队列已满, 则判断当前线程数是否已经达到了 maximumPoolSize, 如果没到达则创建新线程执行任务
如果线程数达到了 maximumPoolSize, 则使用拒绝策略处理
2.3 ThreadPoolExecutor 源码分析
2.3.1 线程池状态
线程池有 running,shutdown,stop,tidying,termienated 几种状态. 在 jdk1.8 的实现中, 复用了一个 AtomicInteger 对象来同时存储线程状态和当前线程数. 具体代码不做展开, 理解这种做法即可.
2.3.2 参数介绍
ReentrantLock mainLook 一个锁, 添加工作线程时要先获取锁
HashSet<Worker> workers 存储工作线程
int largesetPoolSize 记录曾经达到的最大线程数
long completedTaskCount 记录已经完成的任务数量
其他还有构造方法传入的 7 个参数也都有相应的属性进行保存
2.3.3 execute(Runnable command) 方法分析
通过这个方法将任务提交给线程池执行. 这个方法基本是 2.2 节线程池工作流程执行的.
- public void execute(Runnable command) {
- if (command == null) // 空指针异常
- throw new NullPointerException();
- // 获取 clt, 这个 AtomicInteger 对象中存储着当前线程数和线程运行状态
- int c = ctl.get();
- // 如果当前线程数小于核心线程数则执行添加线程动作
- if (workerCountOf(c) <corePoolSize) {
- //addWorker 的第二个参数表示添加的是否是核心线程, 这里是 true
- if (addWorker(command, true))
- return;
- // 添加后重新获取状态值, 因为线程数已经有变化了, 线程池状态也可能变了
- c = ctl.get();
- }
- // 执行到这里, 说明添加核心线程不成功, 可能是数量达到 corePoolSize 或线程池 shutdown 了
- // 如果线程池还在运行尝试将任务加入缓冲队列
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- // 如果添加到队列后, 线程池停止运行了, 将任务从队列移除
- if (! isRunning(recheck) && remove(command))
- reject(command); // 移除成功后使用拒绝策略处理任务
- else if (workerCountOf(recheck) == 0) // 分析 1: 如果核心线程池为空, 添加一个非核心线程, 处理队列中可能的任务
- addWorker(null, false);
- }
- else if (!addWorker(command, false)) // 队列满了, 启动非核心线程执行任务
- reject(command); // 非核心线程启动失败, 执行拒绝策略
- }
2.3.4 addWorker(Runnable,Boolean) 分析
addWorker 方法用来给线程池中添加线程. 第二个参数表示添加的是否是核心线程. 下面来看一下它是怎么工作的吧!
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // 分析 1: 注意这个 if 语句是和 2.3.3 中分析 1 相呼应的.
- // 调用 shutdown() 将空闲线程 interrupt, 正在执行的线程继续执行, 将状态设为 shutdown
- // 调用 shutdownNow() 将所有线程中断, 不管有没有执行完.
- // 如果 shutdown() 后, 核心线程都关闭了, 队列中却还有元素, 2.3.3 分析 1 就添加了新的非核心线程处理, 就是这里的!(rs==SHUTDOWN&&......) 这种情况
- if (rs>= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- int wc = workerCountOf(c);
- // 超过执行线程数, 返回 false
- if (wc>= CAPACITY ||
- wc>= (core ? corePoolSize : maximumPoolSize))
- return false;
- //cas 将线程计数加 1, 失败后重试
- if (compareAndIncrementWorkerCount(c))
- break retry;
- c = ctl.get(); // Re-read ctl
- // 线程状态发生变化重新循环
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- // 下面是添加线程的过程
- boolean workerStarted = false; // 标识线程是否启动
- boolean workerAdded = false; // 标识线程是否被添加
- Worker w = null;
- try {
- w = new Worker(firstTask); // 新生成 worker, 线程池中的线程用它表达
- final Thread t = w.thread; // 获取实际的线程
- if (t != null) {
- // 添加线程要先获取锁
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- // 获取当前状态
- int rs = runStateOf(ctl.get());
- // 不是 shutdown 状态或者, 处于 shutdown 状态但添加的 tash 是 null, 属于 shutdown 下添加线程处理队列中剩余任务的情况
- if (rs <SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- workers.add(w); // 添加到 workers,workers 持有线程
- int s = workers.size();
- if (s> largestPoolSize) // 记录最大线程数记录
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- t.start(); // 启动线程执行
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted) // 没成功
- addWorkerFailed(w);
- }
- return workerStarted;
- }
2.3.5 worker 的执行
添加线程后, worker 就开始执行了, 在它的执行方法 run 里会直接调用 tast 的 run 方法, 执行要干的事情.
执行完成之后会去队列获取新的任务执行.
如果没有新任务执行呢?
Worker 是 ThreadPoolExecutor 的一个内部类, 它的实现了 Runnable 方法. 这里有个问题是 Runnable 需要传递给 Thread 才能执行. Worker 是如何做到的呢?
原来, Worker 类持有了一个 Thread 类型的变量 thread, 并在初始化时使用 Worker 本身初始化了 thread
Worker 的构造方法:
- Worker(Runnable firstTask) {
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);
- }
Worker 的 run 方法调用了 runWoker(this).Worker 还继承了 AQS, 这一块还有待学习.
第一次启动会执行初始化传进来的任务 firstTask; 然后会从 workQueue 中取任务执行, 如果队列为空则等待 keepAliveTime 这么长时间
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- // 第一次时执行外部传过来的 task, 后面从 getTask 获取, getTask 从队列获取任务执行,
- // 如果队列为空则等待 keepAliveTime 这么长的时间
- while (task != null || (task = getTask()) != null) {
- w.lock();
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }
2.3.5 其他内容
下一步应该分析 getTask 是如何从队列获取任务的了, 这里不再展开讲了.
此外, 还应该有队列的实现和选择问题, 拒绝策略的具体等, 目前先不做如此多的分析了, 待未来时机成熟了再完善.
二. 使用 Executors 创建具有默认配置的线程池
java.util.concurrent.Executors 中提供了 4 个静态方法, 可以用来创建具有指定特性的线程池. 都是实例化了 ThreadPoolExecutor 对象.
2.1 newFixedThreadPool() 方法
返回一个带缓存的线程池, 该池在必要的时候创建线程, 在线程空闲 60s 之后终止线程.
下面看一下这个方法的源码实现:
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- }
- 2.2 newFixedThreadPool
创建一个固定长度的线程池, 每当提交一个任务时就创建一个线程, 指导达到线程池的最大数量, 这是线程池的规模将不再变化 (如果某个线程由于发生了未预期的 Exception 而结束, 那么线程池会补充一个新的线程)
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
- }
- 2.3newScheduledThreadPool
创建一个固定长度的线程池, 而且以延迟或定时的方式来执行任务, 类似于 Timer
- public static ScheduledExecutorService newScheduledThreadPool(
- int corePoolSize, ThreadFactory threadFactory) {
- return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
- }
- 2.4newSingleThreadExecutor
是一个单线程的 Executor, 它创建单个工作者线程来执行任务, 如果这个线程异常结束, 会创建另一个线程来替代.
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>()));
- }
来源: http://www.jianshu.com/p/658b01014745