目录
1 说明
1.1 类继承图
2 线程池的状态
3 源码分析
3.1 完整的线程池构造方法
3.2 ctl
3.3 任务的执行
- 3.3.1 execute(Runnable command)
- 3.3.2 addWorker(Runnable firstTask, boolean core)
- 3.3.3 runWorker(Worker w)
- 3.3.4 getTask()
4 任务执行, 带返回值的
5 参考资料
1 说明
下面如果有贴出源码, 对应的源码是 JDK8
主要的源码类
- java.util.concurrent.ThreadPoolExecutor,
- java.util.concurrent.ThreadPoolExecutor.Worker
- java.util.concurrent.AbstractExecutorService
1.1 类继承图
2 线程池的状态
3 源码分析
3.1 完整的线程池构造方法
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
- 3.2 ctl
内部有重要的成员变量 ctl, 类型是 AtomicInteger, 低 29 位表示线程池中线程数, 通过高 3 位表示线程池的运行状态
COUNT_BITS 的值是 29
1,RUNNING:-1 <<COUNT_BITS, 即高 3 位为 111, 该状态的线程池会接收新任务;
2,SHUTDOWN: 0 << COUNT_BITS, 即高 3 位为 000, 该状态的线程池不会接收新任务;
3,STOP : 1 << COUNT_BITS, 即高 3 位为 001;
4,TIDYING : 2 << COUNT_BITS, 即高 3 位为 010, 所有的任务都已经终止;
5,TERMINATED: 3 << COUNT_BITS, 即高 3 位为 011, terminated() 方法已经执行完成
3.3 任务的执行
- execute --> addWorker --> Thread.start --> (Thread.run) --> runTask --> getTask
- 3.3.1 execute(Runnable command)
大致分三个步骤
1, 当前运行的线程数量是否小于 corePoolSize, 直接尝试 addWorker()
2, 往阻塞队列里面放入 Runnable 任务
3, 如果队列已经满了, 直接尝试 addWorker()
3.3.2 addWorker(Runnable firstTask, boolean core)
1, 前置判断线程池的状态
2, 通过 CAS 操作让 ctl 加 1, 表示运行线程数增加 1 个
3, 构造一个 Worker w, 这里要特别注意构造方法里面的这行代码, this.thread = getThreadFactory().newThread(this), 可以看到构造方法内, 有一个 Thread 对象, 其使用了 ThreadFactory 构造了一个新的线程, 并且线程的 runable 是 worker 本身.
4, 执行 w.thread.start(), 也就是说, 当该线程被运行时, Worker 中的 run 方法会被执行
3.3.3 runWorker(Worker w)
通过循环调用 getTask() 获取要执行的任务 task
- beforeExecute
- task.run()
- afterExecute
- 3.3.4 getTask()
直接贴源码了
- private Runnable getTask() {
- boolean timedOut = false; // 是否最后的 poll() 超时了?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- if (rs>= SHUTDOWN && (rs>= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- boolean timed = allowCoreThreadTimeOut || wc> corePoolSize; // worker 是否需要被淘汰
- if ((wc> maximumPoolSize || (timed && timedOut))
- && (wc> 1 || workQueue.isEmpty())) {
- // 这里会让线程的数量记录减, 后面的 return null, 会导致 runWorker 没有获取到数据而让 run() 方法走到尽头, 最终当前线程结束
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- // 如果需要回收一部分线程, 那么超时时间 keepAliveTime 后拿不到就数据就继续循环调用, 就可以在下一次循环的时候进行线程结束回收了; 否则一直阻塞下去
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
4 任务执行, 带返回值的
直接贴源码了
- public <T> Future<T> submit(Callable<T> task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task);
- execute(ftask);
- return ftask;
- }
- public Future<?> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<Void> ftask = newTaskFor(task, null);
- execute(ftask);
- return ftask;
- }
代码比较简单, 把任务封装成一个既实现 Runnable, 也实现 Future 的接口, 这个时候就可以调用 execute() 进行实现了
5 参考资料
来源: https://www.cnblogs.com/powercto/p/11182754.html