一, 前言
随着业务的发展, 单线程已经远远不能满足, 随即就有多线程的出现. 多线程虽然能解决单线程解决不了的事情, 但是它也会给你带来额外的问题. 比如成千上万甚至上百万的线程时候, 你系统就会出现响应延迟, 卡机, 甚至直接卡死的情况. 为什么会出现这样的原因呢? 因为为每个请求创建一个新线程的开销很大: 在创建和销毁线程上花费的时间和消耗的系统资源要比花在处理实际的用户请求的时间和资源更多.
除了创建和销毁线程的开销之外, 活动的线程也消耗系统资源. 在一个 JVM 里创建太多的线程可能会导致系统由于过度消耗内存而用完内存或 "切换过度". 所以为了防止资源不足, 服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目. 而线程池为线程生命周期开销问题和资源不足问题提供了解决方案.
二, 那么线程池有哪些作用呢?
1, 降低资源消耗, 防止资源不足. 合理配置线程池中的线程大小, 防止请求线程猛增; 另外通过重复利用已创建的线程降低线程创建和销毁造成的消耗.
2, 提高响应速度. 线程池可以通过对多个任务重用线程, 在请求到达时线程已经存在(如果有空闲线程时), 所以无意中也消除了线程创建所带来的延迟. 这样, 就可以立即为请求服务, 使应用程序响应更快.
3, 提高线程的可管理性. 使用线程池可以统一分配, 调优和监控线程.
上面知道了线程池的作用, 那么线程池它是如何工作的呢? 其使用核心类是哪一个呢? 所以要做到合理利用线程池, 必须对其实现原理了如指掌.
三, 线程池中核心类: ThreadPoolExecutor
java.uitl.concurrent.ThreadPoolExecutor 类是线程池中最核心的一个类, 所以必须了解这个类的用法及其内部原理, 下面我们来看下 ThreadPoolExecutor 类的具体源码解析.
3.1 继承关系
通过类的继承关系可以得知哪些方法源于哪里(具体请看代码), 下面直接给出类的继承结构的图:
3.2 构造方法
在 ThreadPoolExecutor 类中提供了四个构造方法:
- // 五个参数的构造函数
- public class ThreadPoolExecutor extends AbstractExecutorService {
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
- Executors.defaultThreadFactory(), defaultHandler);
- }
- // 六个参数的构造函数 - 1
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
- threadFactory, defaultHandler);
- }
- // 六个参数的构造函数 -2
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- RejectedExecutionHandler handler) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
- Executors.defaultThreadFactory(), handler);
- }
- // 七个参数的构造函数
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize <0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
- View Code
从源代码中发现前面三个构造器都是调用的第四个构造器进行的初始化工作, 那就以第四个构造函数为例, 解释下其中各个参数的含义(留意源码中每个字段上的注释):
int corePoolSize: 核心线程数
在创建了线程池后, 默认情况下线程池中并没有任何线程, 而是等待有任务到来才创建线程去执行任务;
当提交一个任务到线程池时, 线程池会创建一个线程来执行任务, 即使有其他空闲的基本线程能够执行新任务也会创建线程(比方说: coreSize=5 时, 一开始只有一个任务会创建一个线程, 等执行完后又来了一个任务时, 依然会创建一个线程不会使用第一个线程)
当线程池中的线程数目达到 corePoolSize 后就不再创建线程, 会把到达的任务放到缓存队列当中等待执行. 如果调用了线程池的 prestartAllCoreThreads 方法, 线程池会提前创建并启动所有基本线程, 另外 prestartCoreThread 方法也会启动核心线程, 不过每次只能启动一个.
2. int maximumPoolSize: 线程池允许创建的最大线程数.
如果队列满了, 并且已创建的线程数小于最大线程数, 则线程池会再创建新的线程执行任务. 值得注意的是如果使用了无界的任务队列这个参数就没什么效果.
3. long keepAliveTime: 空闲线程等待超时的时间
默认情况下, 只有当线程池中的线程数大于 corePoolSize 时, keepAliveTime 才会起作用, 直到线程池中的线程数不大于 corePoolSize, 即当线程池中的线程数大于 corePoolSize 时, 如果一个线程空闲的时间达到 keepAliveTime, 则会终止, 直到线程池中的线程数不超过 corePoolSize.
但是如果调用了 allowCoreThreadTimeOut(boolean)方法, 在线程池中的线程数不大于 corePoolSize 时, keepAliveTime 参数也会起作用, 直到线程池中的空闲线程数为 0;
如果任务很多且每个任务执行的时间比较短, 则可以调大时间, 提高线程利用率.
4. TimeUnit unit: 参数 keepAliveTime 的时间单位. 共有七种单位, 如下:
- public enum TimeUnit {
- /**
- * 纳秒 = 千分之一微妙
- */
- NANOSECONDS {...},
- /**
- * 微妙 = 千分之一毫秒
- */
- MICROSECONDS {...},
- /**
- * 毫秒
- */
- MILLISECONDS {...},
- /**
- * 秒
- */
- SECONDS {...},
- /**
- * 分钟
- */
- MINUTES {...},
- /**
- * 小时
- */
- HOURS {...},
- /**
- * 天
- */
- DAYS {...};
- }
- View Code
5. BlockingQueue<Runnable> workQueue: 任务队列, 用于保存等待执行任务的阻塞队列. 队列也有好几种详细请看这里 http://ifeve.com/java-blocking-queue/ , 这里就不做解释了.
6. ThreadFactory threadFactory: 线程工厂, 主要用于创建线程. 其中可以指定线程名字(千万别忽略这件小事, 有意义的名字能让你快速定位到源码中的线程类)
7. RejectedExecutionHandler handler: 饱和策略, 当队列和线程池都满了, 说明线程处于饱和状态, 那么后续进来的任务需要一种策略处理. 默认情况下是 AbortPolicy: 表示无法处理新任务时抛出异常. 线程池框架提供了以下 4 中策略(当然也可以自己自定义策略: 通过实现 RejectedExecutionHandler 接口自定义策略):
AbortPolicy: 不处理新任务, 抛出异常
CallerRunsPolicy: 只用调用者所在的线程来运行任务.
DiscardOldestPolicy: 丢弃队列里最近的一个任务, 并执行当前任务.
DiscardPolicy: 不处理, 丢弃掉.
3.3 重要参数方法和方法解读
1. 线程池状态
- // 初始值 -536870912
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // 初始值 29
- private static final int COUNT_BITS = Integer.SIZE - 3;
- // 初始值 536870911
- private static final int CAPACITY = (1 <<COUNT_BITS) - 1;
- // RUNNING 状态: 接受新任务并处理排队任务
- private static final int RUNNING = -1 << COUNT_BITS;
- // SHUTDOWN 状态: 不接受新任务, 但处理排队任务
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- // STOP 状态: 不接受新任务, 不处理排队任务, 并中断正在进行的任务
- private static final int STOP = 1 << COUNT_BITS;
- // All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
- private static final int TIDYING = 2 << COUNT_BITS;
- // TERMINATED: terminated() has completed
- private static final int TERMINATED = 3 << COUNT_BITS;
- // 获取线程池状态, 取前三位
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- // 获取当前正在工作的 worker, 主要是取后面 29 位
- private static int workerCountOf(int c) { return c & CAPACITY; }
- // 生成 ctl
- private static int ctlOf(int rs, int wc) { return rs | wc; }
当创建线程池后, 初始时, 线程池处于 RUNNING 状态;
RUNNING -> SHUTDOWNN: 如果调用了 shutdown()方法, 则线程池处于 SHUTDOWN 状态, 此时线程池不能够接受新的任务, 它会等待所有任务执行完毕;
(RUNNING or SHUTDOWN) -> STOP: 如果调用了 shutdownNow()方法, 则线程池处于 STOP 状态, 此时线程池不能接受新的任务, 并且会去尝试终止正在执行的任务;
SHUTDOWN -> TIDYING or STOP -> TIDYING : 当线程池处于 SHUTDOWN 或 STOP 状态, 并且所有工作线程已经销毁, 任务缓存队列已经清空或执行结束后, 线程池被设置为 TERMINATED 状态.
2. 线程池中的线程初始化
在说 corePoolSize 参数时有说到初始化线程池的两个方法, 其实在默认情况下, 创建线程池之后线程池中是没有线程的, 需要提交任务之后才会创建线程. 所以如果想在创建线程池之后就创建线程的话, 可以通过下面两个方法创建:
- /**
- * 单个创建核心线程
- */
- public boolean prestartCoreThread() {
- return workerCountOf(ctl.get()) <corePoolSize &&
- addWorker(null, true);
- }
- /**
- * 启动所有核心线程
- */
- public int prestartAllCoreThreads() {
- int n = 0;
- // 添加工作线程
- while (addWorker(null, true))
- ++n;
- return n;
- }
3. 创建线程: addWorker()
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- // 获取运行状态
- int rs = runStateOf(c);
- /**
- * 如果当前的线程池的状态 > SHUTDOWN 那么拒绝 Worker 的 add 如果 = SHUTDOWN
- * 那么此时不能新加入不为 null 的 Task, 如果在 WorkCount 为 empty 的时候不能加入任何类型的 Worker,
- * 如果不为 empty 可以加入 task 为 null 的 Worker, 增加消费的 Worker
- */
- if (rs>= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- // 获取有效线程数, 并判断 // 如果当前的数量超过了 CAPACITY, 或者超过了 corePoolSize 和 maximumPoolSize(试 core 而定), 则直接返回
- int wc = workerCountOf(c);
- if (wc>= CAPACITY ||
- wc>= (core ? corePoolSize : maximumPoolSize))
- return false;
- // //CAS 尝试增加线程数, 如果失败, 证明有竞争, 那么重新到 retry.
- if (compareAndIncrementWorkerCount(c))
- break retry;
- c = ctl.get(); // Re-read ctl
- // 继续判断当前线程池的运行状态
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- /**
- * 新建任务
- */
- Worker w = new Worker(firstTask);
- Thread t = w.thread;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int c = ctl.get();
- int rs = runStateOf(c);
- /**
- * rs!=SHUTDOWN ||firstTask!=null
- *
- * 同样检测当 rs>SHUTDOWN 时直接拒绝减小 Wc, 同时 Terminate, 如果为 SHUTDOWN 同时 firstTask 不为 null 的时候也要 Terminate
- */
- if (t == null ||
- (rs>= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null))) {
- decrementWorkerCount();
- tryTerminate();
- return false;
- }
- workers.add(w);
- int s = workers.size();
- if (s> largestPoolSize)
- largestPoolSize = s;
- } finally {
- mainLock.unlock();
- }
- t.start();
- //Stop 或线程 Interrupt 的时候要中止所有的运行的 Worker
- if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
- t.interrupt();
- return true;
- }
从上面可以看出:
在 rs>SHUTDOWN 时, 拒绝一切线程的增加, 因为 STOP 是会终止所有的线程, 同时移除 Queue 中所有的待执行的线程的, 所以也不需要增加 first=null 的 Worker 了. 其次, 在 SHUTDOWN 状态时, 是不能增加 first!=null 的 Worker 的, 同时即使 first=null, 但是此时 Queue 为 Empty 也是不允许增加 Worker 的, SHUTDOWN 下增加的 Worker 主要用于消耗 Queue 中的任务. SHUTDOWN 状态时, 是不允许向 workQueue 中增加线程的, isRunning(c) && workQueue.offer(command) 每次在 offer 之前都要做状态检测, 也就是线程池状态变为>=SHUTDOWN 时不允许新线程进入线程池了.
4, 执行任务: execute()
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- /*
- * Proceed in 3 steps:
- *
- * 1. If fewer than corePoolSize threads are running, try to
- * start a new thread with the given command as its first
- * task. The call to addWorker atomically checks runState and
- * workerCount, and so prevents false alarms that would add
- * threads when it shouldn't, by returning false.
- *
- * 2. If a task can be successfully queued, then we still need
- * to double-check whether we should have added a thread
- * (because existing ones died since last checking) or that
- * the pool shut down since entry into this method. So we
- * recheck state and if necessary roll back the enqueuing if
- * stopped, or start a new thread if there are none.
- *
- * 3. If we cannot queue task, then we try to add a new
- * thread. If it fails, we know we are shut down or saturated
- * and so reject the task.
- * 原注释已经讲的很清楚了, 主要分三步进行:
- */
- int c = ctl.get();
- // 1, 如果线程数小于基本线程数, 则创建线程并执行当前任务
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // 2, 如果任务可以排队, 则会重新检查看是否可以启动新的任务还是拒绝任务
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 3, 如果我们无法排队任务, 那么我们尝试添加一个新线程. 如果失败, 我们知道我们已关闭或饱和, 因此拒绝该任务.
- else if (!addWorker(command, false))
- reject(command);
- }
注意: 该方法是没有返回值的, 如果想获取线程执行后的结果可以调用 submit 方法 (当然它底层也是调用 execute() 方法)
5, 线程池关闭:
ThreadPoolExecutor 提供了两个方法, 用于线程池的关闭, 分别是 shutdown()和 shutdownNow(), 其中:
shutdown(): 不会立即终止线程池, 而是要等所有任务缓存队列中的任务都执行完后才终止, 但再也不会接受新的任务
shutdownNow(): 立即终止线程池, 并尝试打断正在执行的任务, 并且清空任务缓存队列, 返回尚未执行的任务
四, 关于线程池使用的注意事项
1, 创建线程或线程池时请指定有意义的线程名称, 方便回溯. 来源《阿里巴巴 Java 开发手册》
2, 线程池不允许使用 Executors 去创建, 而是通过 ThreadPoolExecutor 的方式, 这样 的处理方式让写的同学更加明确线程池的运行规则, 规避资源耗尽的风险. 来源《阿里巴巴 Java 开发手册》
说明: Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE, 可能会堆积大量的请求, 从而导致 OOM.
2)CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程, 从而导致 OOM.
3, 合理配置线程池大小, 可以从以下几个角度来进行分析:
任务的性质: CPU 密集型任务, IO 密集型任务和混合型任务.
任务的优先级: 高, 中和低.
任务的执行时间: 长, 中和短.
任务的依赖性: 是否依赖其他系统资源, 如数据库连接.
比方说: 如果是 CPU 密集型任务, 就需要尽量压榨 CPU, 参考值可以设为 NCPU+1;
如果是 IO 密集型任务, 参考值可以设置为 2*NCPU.
注意: 以上值仅供参考, 需要根据具体实际情况而定.
4, 合理设置空闲线程等待时间.
如果任务很多且每个任务执行的时间比较短, 则可以调大时间, 提高线程利用率.
五, 总结 & 推荐学习
上面说了这么多还是需要实践才能知道其具体的作用印象才会更深刻, 本想写个案例出来的(后续补上), 没时间想个好点例子.
下面推荐学习资料:
书籍:《java 并发编程的艺术》
视频: 通过我下面分享的视频, 购买有返现.
六, 参考资料
- https://www.cnblogs.com/dolphin0520/p/3932921.html
- http://ifeve.com/java-threadpool/
《Java 并发编程的艺术》
《阿里巴巴 Java 开发手册》
来源: http://www.bubuko.com/infodetail-2974426.html