多线程的异步执行方式,虽然能够最大限度发挥多核计算机的计算能力,但是如果不加控制,反而会对系统造成负担。线程本身也要占用内存空间,大量的线程会占用内存资源并且可能会导致 Out of Memory。即便没有这样的情况,大量的线程回收也会给 GC 带来很大的压力。
为了避免重复的创建线程,线程池的出现可以让线程进行复用。通俗点讲,当有工作来,就会向线程池拿一个线程,当工作完成后,并不是直接关闭线程,而是将这个线程归还给线程池供其他任务使用。
接下来从总体到细致的方式,来共同探讨线程池。
来看 Executor 的框架图:
结合上面的流程图来逐行解析,首先前面进行空指针检查,
wonrkerCountOf() 方法能够取得当前线程池中的线程的总数,取得当前线程数与核心池大小比较,
excute() 方法中添加任务的方式是使用 addWorker()方法,看一下源码,一起学习一下。
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- // 外层循环,用于判断线程池状态
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- // 内层的循环,任务是将worker数量加1
- for (;;) {
- int wc = workerCountOf(c);
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- if (compareAndIncrementWorkerCount(c))
- break retry;
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- // worker加1后,接下来将woker添加到HashSet<Worker>中,并启动worker
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- final ReentrantLock mainLock = this.mainLock;
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int c = ctl.get();
- int rs = runStateOf(c);
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- // 如果往HashSet<Worker>添加成功,则启动该线程
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
addWorker(Runnable firstTask, boolean core) 的主要任务是创建并启动线程。
他会根据当前线程的状态和给定的值(core or maximum)来判断是否可以创建一个线程。
addWorker 共有四种传参方式。execute 使用了其中三种,分别为:1.addWorker(paramRunnable, true)
线程数小于 corePoolSize 时,放一个需要处理的 task 进 Workers Set。如果 Workers Set 长度超过 corePoolSize,就返回 false.
2.
addWorker(null, false)
放入一个空的 task 进 workers Set,长度限制是 maximumPoolSize。这样一个 task 为空的 worker 在线程执行的时候会去任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务。
3.addWorker(paramRunnable, false)
当队列被放满时,就尝试将这个新来的 task 直接放入 Workers Set,而此时 Workers Set 的长度限制是 maximumPoolSize。如果线程池也满了的话就返回 false.
还有一种情况是 execute() 方法没有使用的
addWorker(null, true)
这个方法就是放一个 null 的 task 进 Workers Set,而且是在小于 corePoolSize 时,如果此时 Set 中的数量已经达到 corePoolSize 那就返回 false,什么也不干。实际使用中是在 prestartAllCoreThreads() 方法,这个方法用来为线程池预先启动 corePoolSize 个 worker 等待从 workQueue 中获取任务执行。
执行流程:1、判断线程池当前是否为可以添加 worker 线程的状态,可以则继续下一步,不可以 return false:
A、线程池状态 > shutdown,可能为 stop、tidying、terminated,不能添加 worker 线程
B、线程池状态 ==shutdown,firstTask 不为空,不能添加 worker 线程,因为 shutdown 状态的线程池不接收新任务
C、线程池状态 ==shutdown,firstTask==null,workQueue 为空,不能添加 worker 线程,因为 firstTask 为空是为了添加一个没有任务的线程再从 workQueue 获取 task,而 workQueue 为 空,说明添加无任务线程已经没有意义
2、线程池当前线程数量是否超过上限(corePoolSize 或 maximumPoolSize),超过了 return false,没超过则对 workerCount+1,继续下一步
3、在线程池的 ReentrantLock 保证下,向 Workers Set 中添加新创建的 worker 实例,添加完成后解锁,并启动 worker 线程,如果这一切都成功了,return true,如果添加 worker 入 Set 失败或启动失败,调用 addWorkerFailed() 逻辑
- public static ExecutorService newFixedThreadPool(int var0) {
- return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
- }
- public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
- return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
- }
固定大小的线程池,可以指定线程池的大小,该线程池 corePoolSize 和 maximumPoolSize 相等,阻塞队列使用的是 LinkedBlockingQueue,大小为整数最大值。
该线程池中的线程数量始终不变,当有新任务提交时,线程池中有空闲线程则会立即执行,如果没有,则会暂存到阻塞队列。对于固定大小的线程池,不存在线程数量的变化。同时使用无界的 LinkedBlockingQueue 来存放执行的任务。当任务提交十分频繁的时候,LinkedBlockingQueue
迅速增大,存在着耗尽系统资源的问题。而且在线程池空闲时,即线程池中没有可运行任务时,它也不会释放工作线程,还会占用一定的系统资源,需要 shutdown。
newSingleThreadExecutor
- public static ExecutorService newSingleThreadExecutor() {
- return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
- }
- public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
- return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
- }
单个线程线程池,只有一个线程的线程池,阻塞队列使用的是 LinkedBlockingQueue, 若有多余的任务提交到线程池中,则会被暂存到阻塞队列,待空闲时再去执行。按照先入先出的顺序执行任务。
newCachedThreadPool
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
- }
- public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
- return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
- }
缓存线程池,缓存的线程默认存活 60 秒。线程的核心池 corePoolSize 大小为 0,核心池最大为 Integer.MAX_VALUE, 阻塞队列使用的是 SynchronousQueue。是一个直接提交的阻塞队列, 他总会迫使线程池增加新的线程去执行新的任务。在没有任务执行时,当线程的空闲时间超过 keepAliveTime(60 秒),则工作线程将会终止被回收,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销。如果同时又大量任务被提交,而且任务执行的时间不是特别快,那么线程池便会新增出等量的线程池处理任务,这很可能会很快耗尽系统的资源。
newScheduledThreadPool
- public static ScheduledExecutorService newScheduledThreadPool(int var0) {
- return new ScheduledThreadPoolExecutor(var0);
- }
- public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
- return new ScheduledThreadPoolExecutor(var0, var1);
- }
定时线程池,该线程池可用于周期性地去执行任务,通常用于周期性的同步数据。
scheduleAtFixedRate: 是以固定的频率去执行任务,周期是指每次执行任务成功执行之间的间隔。
schedultWithFixedDelay: 是以固定的延时去执行任务,延时是指上一次执行成功之后和下一次开始执行的之前的时间。
newFixedThreadPool 实例:
View Code
newCachedThreadPool 实例:
View Code
这里没用调用 shutDown 方法,这里可以发现过 60 秒之后,会自动释放资源。
newSingleThreadExecutor
View Code
这里需要注意一点,newSingleThreadExecutor 和 newFixedThreadPool 一样,在线程池中没有任务时可执行,也不会释放系统资源的,所以需要 shudown。
newScheduledThreadPool
View Code
当然线程池的大小也不需要做的太过于精确,只需要避免过大和过小的情况。一般来说,确定线程池的大小需要考虑 CPU 的数量,内存大小,任务是计算密集型还是 IO 密集型等因素
NCPU = CPU 的数量 UCPU = 期望对 CPU 的使用率 0 ≤ UCPU ≤ 1W/C = 等待时间与计算时间的比率如果希望处理器达到理想的使用率,那么线程池的最优大小为:线程池大小 = NCPU *UCPU(1+W/C)在 Java 中使用
- int ncpus = Runtime.getRuntime().availableProcessors();
获取 CPU 的数量。
Executors 的线程池如果不指定线程工厂会使用 Executors 中的 DefaultThreadFactory, 默认线程池工厂创建的线程都是非守护线程。
使用自定义的线程工厂可以做很多事情,比如可以跟踪线程池在何时创建了多少线程,也可以自定义线程名称和优先级。如果将
新建的线程都设置成守护线程,当主线程退出后,将会强制销毁线程池。
下面这个例子,记录了线程的创建,并将所有的线程设置成守护线程。
View Code
ThreadPoolExecutor 是可以拓展的,它提供了几个可以在子类中改写的方法:beforeExecute,afterExecute 和 terimated。
在执行任务的线程中将调用 beforeExecute 和 afterExecute, 这些方法中还可以添加日志,计时,监视或统计收集的功能,
还可以用来输出有用的调试信息,帮助系统诊断故障。下面是一个扩展线程池的例子:
View Code
线程池的正确使用
以下阿里编码规范里面说的一段话:
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 各个方法的弊端:
1)newFixedThreadPool 和 newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。
2)newCachedThreadPool 和 newScheduledThreadPool:
主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。
1. 任务独立。如何任务依赖于其他任务,那么可能产生死锁。例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁。
2. 合理配置阻塞时间过长的任务。如果任务阻塞时间过长,那么即使不出现死锁,线程池的性能也会变得很糟糕。在 Java 并发包里可阻塞方法都同时定义了限时方式和不限时方式。例如
Thread.join,BlockingQueue.put,CountDownLatch.await 等,如果任务超时,则标识任务失败,然后中止任务或者将任务放回队列以便随后执行,这样,无论任务的最终结果是否成功,这种办法都能够保证任务总能继续执行下去。
3. 设置合理的线程池大小。只需要避免过大或者过小的情况即可,上文的公式线程池大小 = NCPU *UCPU(1+W/C)。
4. 选择合适的阻塞队列。newFixedThreadPool 和 newSingleThreadExecutor 都使用了无界的阻塞队列,无界阻塞队列会有消耗很大的内存,如果使用了有界阻塞队列,它会规避内存占用过大的问题,但是当任务填满有界阻塞队列,新的任务该怎么办?在使用有界队列是,需要选择合适的拒绝策略,队列的大小和线程池的大小必须一起调节。对于非常大的或者无界的线程池,可以使用 SynchronousQueue 来避免任务排队,以直接将任务从生产者提交到工作者线程。
下面是 Thrift 框架处理 socket 任务所使用的一个线程池,可以看一下 FaceBook 的工程师是如何自定义线程池的。
总结:
- private static ExecutorService createDefaultExecutorService(Args args) {
- SynchronousQueue executorQueue = new SynchronousQueue();
- return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS,
- executorQueue);
- }
本文是作者在平时的工作学习中总结出来的,如果不足之处欢迎批评斧正。
参考资料《实战 Java》高并发程序设计
《Java Concurrency in Practice》
Java 线程池 ThreadPoolExecutor 使用和分析 (二)来源: https://www.cnblogs.com/superfj/p/7544971.html