前言
线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配,调优和监控,有以下好处:
1,降低资源消耗;
2,提高响应速度;
3,提高线程的可管理性.
Java1.5 中引入的 Executor 框架把任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池,而不用关心该任务是如何执行,被哪个线程执行,以及什么时候执行.
如果你也想在 IT 行业拿高薪,可以参加我们的训练营课程,选择最适合自己的课程学习,技术大牛亲授,7 个月后,进入名企拿高薪.我们的课程内容有:Java 工程化,高性能及分布式,高性能,深入浅出.高架构.性能调优,Spring,MyBatis,Netty 源码分析和大数据等多个知识点.如果你想拿高薪的,想学习的,想就业前景好的,想跟别人竞争能取得优势的,想进阿里面试但担心面试不过的,你都可以来,群号为:575745314
demo
1,Executors.newFixedThreadPool(10) 初始化一个包含 10 个线程的线程池 executor;
2,通过 executor.execute 方法提交 20 个任务,每个任务打印当前的线程名;
3,负责执行任务的线程的生命周期都由 Executor 框架进行管理;
ThreadPoolExecutor
Executors 是 java 线程池的工厂类,通过它可以快速初始化一个符合业务需求的线程池,如 Executors.newFixedThreadPool 方法可以生成一个拥有固定线程数的线程池.
其本质是通过不同的参数初始化一个 ThreadPoolExecutor 对象,具体参数描述如下:
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize;如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的 prestartAllCoreThreads() 方法,线程池会提前创建并启动所有核心线程.
maximumPoolSize
线程池中允许的最大线程数.如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize;
keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认情况下,该参数只在线程数大于 corePoolSize 时才有用;
unit
keepAliveTime 的单位;
workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现 Runable 接口,在 JDK 中提供了如下阻塞队列:
1,ArrayBlockingQueue:基于数组结构的有界阻塞队列,按 FIFO 排序任务;
2,LinkedBlockingQuene:基于链表结构的阻塞队列,按 FIFO 排序任务,吞吐量通常要高于 ArrayBlockingQuene;
3,SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQuene;
4,priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名.
handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:
1,AbortPolicy:直接抛出异常,默认策略;
2,CallerRunsPolicy:用调用者所在的线程来执行任务;
3,DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4,DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务.
Exectors
Exectors 工厂类提供了线程池的初始化接口,主要有如下几种:
newFixedThreadPool
初始化一个指定线程数的线程池,其中 corePoolSize == maximumPoolSize,使用 LinkedBlockingQuene 作为阻塞队列,不过当线程池没有可执行任务时,也不会释放线程.
newCachedThreadPool
1,初始化一个可以缓存线程的线程池,默认缓存 60s,线程池的线程数可达到 Integer.MAX_VALUE,即 2147483647,内部使用 SynchronousQueue 作为阻塞队列;
2,和 newFixedThreadPool 创建的线程池不同,newCachedThreadPool 在没有任务执行时,当线程的空闲时间超过 keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;
所以,使用该线程池时,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题.
newSingleThreadExecutor
初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行,内部使用 LinkedBlockingQueue 作为阻塞队列.
newScheduledThreadPool
初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据.
实现原理
除了 newScheduledThreadPool 的内部实现特殊一点之外,其它几个线程池都是基于 ThreadPoolExecutor 类实现的.
线程池内部状态
其中 AtomicInteger 变量 ctl 的功能非常强大:利用低 29 位表示线程池中线程数,通过高 3 位表示线程池的运行状态:
1,RUNNING:-1 << COUNT_BITS,即高 3 位为 111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
2,SHUTDOWN: 0 << COUNT_BITS,即高 3 位为 000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
3,STOP : 1 << COUNT_BITS,即高 3 位为 001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
4,TIDYING : 2 << COUNT_BITS,即高 3 位为 010;
5,TERMINATED: 3 << COUNT_BITS,即高 3 位为 011;
任务提交
线程池框架提供了两种方式提交任务,根据不同的业务需求选择不同的方式.
Executor.execute()
通过 Executor.execute() 方法提交的任务,必须实现 Runnable 接口,该方式提交的任务不能获取返回值,因此无法判断任务是否执行成功.
ExecutorService.submit()
通过 ExecutorService.submit() 方法提交的任务,可以获取任务执行完的返回值.
任务执行
当向线程池中提交一个任务,线程池会如何处理该任务?
execute 实现
具体的执行流程如下:
1,workerCountOf 方法根据 ctl 的低 29 位,得到线程池的当前线程数,如果线程数小于 corePoolSize,则执行 addWorker 方法创建新的线程执行任务;否则执行步骤(2);
2,如果线程池处于 RUNNING 状态,且把提交的任务成功放入阻塞队列中,则执行步骤(3),否则执行步骤(4);
3,再次检查线程池的状态,如果线程池没有 RUNNING,且成功从阻塞队列中删除任务,则执行 reject 方法处理任务;
4,执行 addWorker 方法创建新的线程执行任务,如果 addWoker 执行失败,则执行 reject 方法处理任务;
addWorker 实现
从方法 execute 的实现可以看出:addWorker 主要负责创建新的线程并执行任务,代码实现如下:
这只是 addWoker 方法实现的前半部分:
1,判断线程池的状态,如果线程池的状态值大于或等 SHUTDOWN,则不处理提交的任务,直接返回;
2,通过参数 core 判断当前需要创建的线程是否为核心线程,如果 core 为 true,且当前线程数小于 corePoolSize,则跳出循环,开始创建新的线程,具体实现如下:
线程池的工作线程通过 Woker 类实现,在 ReentrantLock 锁的保证下,把 Woker 实例插入到 HashSet 后,并启动 Woker 中的线程,其中 Worker 类设计如下:
1,继承了 AQS 类,可以方便的实现工作线程的中止操作;
2,实现了 Runnable 接口,可以将自身作为一个任务在工作线程中执行;
3,当前提交的任务 firstTask 作为参数传入 Worker 的构造方法;
从 Woker 类的构造方法实现可以发现:线程工厂在创建线程 thread 时,将 Woker 实例本身 this 作为参数传入,当执行 start 方法启动线程 thread 时,本质是执行了 Worker 的 runWorker 方法.
runWorker 实现
runWorker 方法是线程池的核心:
1,线程启动之后,通过 unlock 方法释放锁,设置 AQS 的 state 为 0,表示运行中断;
2,获取第一个任务 firstTask,执行任务的 run 方法,不过在执行任务之前,会进行加锁操作,任务执行完会释放锁;
3,在执行任务的前后,可以根据业务场景自定义 beforeExecute 和 afterExecute 方法;
4,firstTask 执行完成之后,通过 getTask 方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask 方法会被阻塞并挂起,不会占用 cpu 资源;
getTask 实现
整个 getTask 操作在自旋下完成:
1,workQueue.take:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take 方法返回任务,并执行;
2,workQueue.poll:如果在 keepAliveTime 时间内,阻塞队列还是没有任务,则返回 null;
所以,线程池中实现的线程可以一直执行由用户提交的任务.
Future 和 Callable 实现
通过 ExecutorService.submit() 方法提交的任务,可以获取任务执行完的返回值.
在实际业务场景中,Future 和 Callable 基本是成对出现的,Callable 负责产生结果,Future 负责获取结果.
1,Callable 接口类似于 Runnable,只是 Runnable 没有返回值.
2,Callable 任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即 Future 可以拿到异步执行任务各种结果;
3,Future.get 方法会导致主线程阻塞,直到 Callable 任务执行完成;
submit 实现
通过 submit 方法提交的 Callable 任务会被封装成了一个 FutureTask 对象.
FutureTask
1,FutureTask 在不同阶段拥有不同的状态 state,初始化为 NEW;
2,FutureTask 类实现了 Runnable 接口,这样就可以通过 Executor.execute 方法提交 FutureTask 到线程池中等待被执行,最终执行的是 FutureTask 的 run 方法;
FutureTask.get 实现
内部通过 awaitDone 方法对主线程进行阻塞,具体实现如下:
1,如果主线程被中断,则抛出中断异常;
2,判断 FutureTask 当前的 state,如果大于 COMPLETING,说明任务已经执行完成,则直接返回;
3,如果当前 state 等于 COMPLETING,说明任务已经执行完,这时主线程只需通过 yield 方法让出 cpu 资源,等待 state 变成 NORMAL;
4,通过 WaitNode 类封装当前线程,并通过 UNSAFE 添加到 waiters 链表;
5,最终通过 LockSupport 的 park 或 parkNanos 挂起线程;
FutureTask.run 实现
FutureTask.run 方法是在线程池中被执行的,而非主线程
1,通过执行 Callable 任务的 call 方法;
2,如果 call 执行成功,则通过 set 方法保存结果;
3,如果 call 执行有异常,则通过 setException 保存异常;
set
setException
set 和 setException 方法中,都会通过 UnSAFE 修改 FutureTask 的状态,并执行 finishCompletion 方法通知主线程任务已经执行完成;
finishCompletion
1,执行 FutureTask 类的 get 方法时,会把主线程封装成 WaitNode 节点并保存在 waiters 链表中;
2,FutureTask 任务执行完成后,通过 UNSAFE 设置 waiters 的值,并通过 LockSupport 类 unpark 方法唤醒主线程;
深入分析 java 线程池的实现原理
来源: http://www.bubuko.com/infodetail-2474458.html