好久不发文章了, 难道是因为忙, 其实是因为懒. 这是一篇关于线程池使用和基本原理的科普水文, 如果你经常用到线程池, 不知道你的用法标准不标准, 是否有隐藏的 OOM 风险. 不经常用线程池的同学, 还有对几种线程的使用不甚了解的同学可以读一下此文.
为什么要使用线程池
虽然大家应该都已经很清楚了, 但还是说一下. 其实归根结底最主要的一个原因就是为了提高性能.
线程池和数据库连接池是同样的道理, 数据库连接池是为了减少连接建立和释放带来的性能开销. 而线程池则是为了减少线程建立和销毁带来的性能消耗.
以 web 项目为例, 有以下两种情况:
1, 每次过来一个请求, 都要在服务端创建一个新线程来处理请求, 请求处理完成销毁线程;
2, 每次过来一个请求, 服务端在线程池中直接拿过一个空闲的线程来处理这个请求, 处理完成后还给线程池;
答案是肯定的, 肯定是第二种使用线程池的方式性能更好.
除了性能这个最重要的原因外, 线程池的使用可以帮助我们更合理的使用系统资源. 还是以 web 项目为例, 如果我们在服务端不使用线程池, 而是无节制的来一个请求创建一个线程, 系统资源将会很快被耗尽. 而使用线程池的话, 则可以防止这种情况发生, 当然这要建立在正确合理的使用线程池的基础上, 要固定线程的最大数以及等待队列的大小.
几种线程池的使用和原理
线程池固然好用, 但是要建立在正确的使用方式的基础上, 如果使用方式不当, 同样会出现问题. 接下来就介绍一下几种线程池的使用.
在大名鼎鼎的 J.U.C 包下已经提供了 Executors 类, 它已经封装实现了四种创建线程池的方式, 它暴露出几个简单的方法供开发者调用. 最终都是通过 new ThreadPoolExecutor() ExecutorService 实例, 从而得到我们想要的线程池类型. 这样做其实有利有弊, 好的是我们不用关心那么多参数, 只需要简单的指定一两个参数就可以; 不好的是, 这样一来又屏蔽了很多细节, 如果有些参数使用默认的, 而开发者又不了解原理的情况下, 可能会造成 OOM 等问题.
很多公司都不建议或者强制不允许直接使用 Executors 类提供的方法来创建线程池, 例如阿里巴巴 Java 开发手册里就明确不允许这样创建线程池, 一定要通过 ThreadPoolExecutor(xx,xx,xx...) 来明确线程池的运行规则, 指定更合理的参数.
先来看一下 ThreadPoolExecutor 的几个参数和它们的意义, 先来看一下它最完整参数的重载.
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
一共有 7 个参数.
corePoolSize
核心线程数, 当有任务进来的时候, 如果当前线程数还未达到 corePoolSize 个数, 则创建核心线程, 核心线程有几个特点:
1, 当线程数未达到核心线程最大值的时候, 新任务进来, 即使有空闲线程, 也不会复用, 仍然新建核心线程;
2, 核心线程一般不会被销毁, 即使是空闲的状态, 但是如果通过方法 allowCoreThreadTimeOut(boolean value) 设置为 true 时, 超时也同样会被销毁;
3, 生产环境首次初始化的时候, 可以调用 prestartCoreThread() 方法来预先创建所有核心线程, 避免第一次调用缓慢;
maximumPoolSize
除了有核心线程外, 有些策略是当核心线程完全无空闲的时候, 还会创建一些临时的线程来处理任务, maximumPoolSize 就是核心线程 + 临时线程的最大上限. 临时线程有一个超时机制, 超过了设置的空闲时间没有事儿干, 就会被销毁.
keepAliveTime
这个就是上面两个参数里所提到的超时时间, 也就是线程的最大空闲时间, 默认用于非核心线程, 通过 allowCoreThreadTimeOut(boolean value) 方法设置后, 也会用于核心线程.
unit
这个参数配合上面的 keepAliveTime , 指定超时的时间单位, 秒, 分, 时等.
workQueue
等待执行的任务队列, 如果核心线程没有空闲的了, 新来的任务就会被放到这个等待队列中. 这个参数其实一定程度上决定了线程池的运行策略, 为什么这么说呢, 因为队列分为有界队列和无界队列.
有界队列: 队列的长度有上限, 当核心线程满载的时候, 新任务进来进入队列, 当达到上限, 有没有核心线程去即时取走处理, 这个时候, 就会创建临时线程.(警惕临时线程无限增加的风险)
无界队列: 队列没有上限的, 当没有核心线程空闲的时候, 新来的任务可以无止境的向队列中添加, 而永远也不会创建临时线程.(警惕任务队列无限堆积的风险)
threadFactory
它是一个接口, 用于实现生成线程的方式, 定义线程名格式, 是否后台执行等等, 可以用 Executors.defaultThreadFactory() 默认的实现即可, 也可以用 Guava 等三方库提供的方法实现, 如果有特殊要求的话可以自己定义. 它最重要的地方应该就是定义线程名称的格式, 便于排查问题了吧.
handler
当没有空闲的线程处理任务, 并且等待队列已满(当然这只对有界队列有效), 再有新任务进来的话, 就要做一些取舍了, 而这个参数就是指定取舍策略的, 有下面四种策略可以选择:
ThreadPoolExecutor.AbortPolicy: 直接抛出异常, 这是默认策略;
ThreadPoolExecutor.DiscardPolicy: 直接丢弃任务, 但是不抛出异常.
ThreadPoolExecutor.DiscardOldestPolicy: 丢弃队列最前面的任务, 然后将新来的任务加入等待队列
ThreadPoolExecutor.CallerRunsPolicy: 由线程池所在的线程处理该任务, 比如在 main 函数中创建线程池, 如果执行此策略, 将有 main 线程来执行该任务
虽然并不提倡用 Executors 中的方法来创建线程池, 但还是用他们来讲一下几种线程池的原理.
1,newFixedThreadPool
它有两个重载方法, 代码如下:
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
- }
- public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(),
- threadFactory);
- }
建立一个线程数量固定的线程池, 规定的最大线程数量, 超过这个数量之后进来的任务, 会放到等待队列中, 如果有空闲线程, 则在等待队列中获取, 遵循先进先出原则.
创建固定线程数量线程池, corePoolSize 和 maximumPoolSize 要一致, 即核心线程数和最大线程数 (核心 + 非核心线程) 一致, Executors 默认使用的是 LinkedBlockingQueue 作为等待队列, 这是一个无界队列, 这也是使用它的风险所在, 除非你能保证提交的任务不会无节制的增长, 否则不要使用无界队列, 这样有可能造成等待队列无限增加, 造成 OOM.
正确的创建固定线程数线程池的做法是
- private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();
- public static ExecutorService createFixedThreadPool() {
- int poolSize = 5;
- int queueSize = 10;
- ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.SECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize), threadFactory, new ThreadPoolExecutor.AbortPolicy());
- return executorService;
- }
上面代码是创建一个 5 个线程的固定数量线程池, 这里线程存活时间没有作用, 所以设置为 0, 使用了 ArrayBlockingQueue 作为等待队列, 设置长度为 10 , 最多允许 10 个等待任务, 超过的任务会执行默认的 AbortPolicy 策略, 也就是直接抛异常. ThreadFactory 使用了 Guava 库提供的方法, 定义了线程名称, 方便之后排查问题.
2,newSingleThreadExecutor
建立一个只有一个线程的线程池, 如果有超过一个任务进来, 只有一个可以执行, 其余的都会放到等待队列中, 如果有空闲线程, 则在等待队列中获取, 遵循先进先出原则. 使用 LinkedBlockingQueue 作为等待队列.
这个方法同样存在等待队列无限长的问题, 容易造成 OOM, 所以正确的创建方式参考上面固定数量线程池创建的方式, 只是把 poolSize 设置为 1 .
3,newCachedThreadPool
缓存型线程池, 在核心线程达到最大值之前, 有任务进来就会创建新的核心线程, 并加入核心线程池, 即时有空闲的线程, 也不会复用. 达到最大核心线程数后, 新任务进来, 如果有空闲线程, 则直接拿来使用, 如果没有空闲线程, 则新建临时线程. 并且线程的允许空闲时间都很短, 如果超过空闲时间没有活动, 则销毁临时线程. 关键点就在于它使用 SynchronousQueue 作为等待队列, 它不会保留任务, 新任务进来后, 直接创建临时线程处理, 这样一来, 也就容易造成无限制的创建线程, 造成 OOM.
正确的创建缓存型线程池的做法是
- private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();
- public static ExecutorService createCacheThreadPool(){
- int coreSize = 10;
- int maxSize = 20;
- return new ThreadPoolExecutor(coreSize, maxSize, 10L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(), threadFactory, new ThreadPoolExecutor.AbortPolicy());
- }
- 4,newScheduledThreadPool
计划型线程池, 可以设置固定时间的延时或者定期执行任务, 同样是看线程池中有没有空闲线程, 如果有, 直接拿来使用, 如果没有, 则新建线程加入池. 使用的是 DelayedWorkQueue 作为等待队列, 这中类型的队列会保证只有到了指定的延时时间, 才会执行任务.
正确的创建缓存型线程池的做法是
- private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();
- private static CountDownLatch latch = new CountDownLatch(1);
- public static void main(String[] args) throws InterruptedException {
- Task task = new Task();
- ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, threadFactory);
- executorService.scheduleAtFixedRate(task,0L,5L, TimeUnit.SECONDS);
- latch.await();
- }
- static class Task implements Runnable{
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName() + "executing");
- }
- }
来源: https://www.cnblogs.com/fengzheng/p/9297602.html