承上启下: 上一篇文章小豹子讲了线程池的实例化过程, 粗略介绍了线程池的状态转换; 这篇文章主要讲了我运行线程池时遇到的小问题, 以及 execute 方法的源码理解
4 并不算疑难的 Bug
按照我们的规划, 下一步就应该提交任务, 探究线程池执行任务时的内部行为, 但首先, 我要提交一个任务嘛于是, 接着上一篇文章的代码, 我提交了一个任务:
- @Test
- public void submitTest() {
- // 创建线程池
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread();
- }
- }, new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- System.out.println("拒绝服务");
- }
- });
- // 提交任务, 该任务为睡眠 1 秒后打印 Hello
- threadPoolExecutor.submit(new Callable<String>() {
- @Override
- public String call() throws InterruptedException {
- Thread.sleep(1000L);
- System.out.println("Hello");
- return null;
- }
- });
- }
而我并没有看到任何输出, 程序也并没有睡眠一秒, 而是马上结束了哦对, 我想起来, 我们创建的线程默认是守护线程, 当所有用户线程结束之后, 程序就会结束了, 并不会理会是否还有守护线程在运行那么我们用一个简单易行的办法来解决这个问题 不让用户线程结束, 让它多睡一会:
- @Test
- public void submitTest() throws InterruptedException {
- // 创建线程池
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread();
- }
- }, new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- System.out.println("拒绝服务");
- }
- });
- // 提交任务, 该任务为睡眠 1 秒后打印 Hello
- threadPoolExecutor.submit(new Callable<String>() {
- @Override
- public String call() throws InterruptedException {
- Thread.sleep(1000L);
- System.out.println("Hello");
- return null;
- }
- });
- // 使主线程休眠 5 秒, 防止守护线程意外退出
- Thread.sleep(5000L);
- }
然而, 程序等待 5 秒之后, 依旧没有输出我的第一个反应是, 我对于线程池的用法不对是不是还需要调用某个方法来激活或者启动线程池? 而无论在文档中, 还是各博客的例子中, 我都没有找到类似的方法我们仔细思考一下这个 Bug, 产生这样问题的可能原因有三:
ThreadPoolExecutor 内部代码有问题
我对 ThreadPoolExecutor 的使用方法不对
我设计的 ThreadFactory 或
RejectedExecutionHandler
有问题
原因 1, 可能性太小, 几乎没有那么原因 23, 我们现在没法排除, 于是我尝试构建一个最小可重现错误, 将 ThreadPoolExecutor 剥离出来, 看 Bug 是否重现:
最小可重现 (minimal reproducible) 这个思想是我在翻译使用 Rust 开发一个简单的 web 应用, 第 4 部分 CLI 选项解析时, 作者用到的思想就是在我们无法定位 Bug 时, 剥离出当前代码中我们认为无关的部分, 剥离后观察 Bug 是否重现, 一步步缩小 Bug 的范围通俗的说, 就是排除法
- private class MyThreadFactory implements ThreadFactory{
- @Override
- public Thread newThread(Runnable r) {
- return new Thread();
- }
- }
- @Test
- public void reproducibleTest() throws InterruptedException {
- new MyThreadFactory().newThread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("Hello");
- }
- }).start();
- Thread.sleep(5000L);
- }
还是没有任何输出, 不过这是一个好消息, 这意味着我们定位了问题所在: 现在问题只可能出现在 MyThreadFactory 中, 短短 6 行代码会有什么问题呢? 哎呦(拍大腿), 我没有把 Runnable r 传给 new Thread() 啊, 我一直在执行一个空线程啊, 怎么可能有任何输出! 于是:
return new Thread(r);
这样一改就好了
5 重构
上面的问题看似简单, 但能出现这么低级的错误, 值得我思考我因为产生该错误的原因有二:
我不了解 ThreadPoolExecutor 的原理, 从语法上看 ThreadFactory 的实现类只需要传出一个 Thread 实例就行了, 却不知 Runnable r 不可或缺
测试代码结构凌乱不堪即便是测试代码, 也不应该写成面条, 自己看尚不能清楚明了, 何谈读者?
于是, 我决定对测试代码进行重构这次重构, 一要使线程工厂产生非守护线程, 防止因为主进程的退出导致线程池中线程全部意外退出; 二要对每个操作打日志, 我们要能直观的观察到线程池在做什么, 值得一提的是, 对于阻塞队列的日志操作, 我使用了动态代理的方式对每一个方法打日志, 不熟悉动态代理的童鞋可以戳我之前写的小豹子带你看源码: JDK 动态代理
- // import...
- public class ThreadPoolExecutorTest {
- /**
- * 记录启动时间
- */
- private final static long START_TIME = System.currentTimeMillis();
- /**
- * 自定义线程工厂, 产生非守护线程, 并打印日志
- */
- private class MyThreadFactory implements ThreadFactory {
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r);
- thread.setDaemon(false);
- debug("创建线程 - %s", thread.getName());
- return thread;
- }
- }
- /**
- * 自定义拒绝服务异常处理器, 打印拒绝服务信息
- */
- private class MyRejectedExecutionHandler implements RejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- debug("拒绝请求, Runnable:%s,ThreadPoolExecutor:%s", r, executor);
- }
- }
- /**
- * 自定义任务, 休眠 1 秒后打印当前线程名, 并返回线程名
- */
- private class MyTask implements Callable<String> {
- @Override
- public String call() throws InterruptedException {
- Thread.sleep(1000L);
- String threadName = Thread.currentThread().getName();
- debug("MyTask - %s", threadName);
- return threadName;
- }
- }
- /**
- * 对 BlockingQueue 的动态代理, 实现对 BlockingQueue 的所有方法调用打 Log
- */
- private class PrintInvocationHandler implements InvocationHandler {
- private final BlockingQueue<?> blockingQueue;
- private PrintInvocationHandler(BlockingQueue<?> blockingQueue) {
- this.blockingQueue = blockingQueue;
- }
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- debug("BlockingQueue - %s, 参数为:%s", method.getName(), Arrays.toString(args));
- Object result = method.invoke(blockingQueue, args);
- debug("BlockingQueue - %s 执行完毕, 返回值为:%s", method.getName(), String.valueOf(result));
- return result;
- }
- }
- /**
- * 产生 BlockingQueue 代理类
- * @param blockingQueue 原 BlockingQueue
- * @param <E> 任意类型
- * @return 动态代理 BlockingQueue, 执行任何方法时会打 Log
- */
- @SuppressWarnings("unchecked")
- private <E> BlockingQueue<E> debugQueue(BlockingQueue<E> blockingQueue) {
- return (BlockingQueue<E>) Proxy.newProxyInstance(this.getClass().getClassLoader(),
- new Class<?>[]{BlockingQueue.class},
- new PrintInvocationHandler(blockingQueue));
- }
- /**
- * 实例化一个 核心池为 3, 最大池为 5, 存活时间为 20s, 利用上述阻塞队列线程工厂拒绝服务处理器的线程池实例
- * @return 返回 ThreadPoolExecutor 实例
- */
- private ThreadPoolExecutor newTestPoolInstance() {
- return new ThreadPoolExecutor(3, 5, 20,
- TimeUnit.SECONDS, debugQueue(new LinkedBlockingQueue<>()),
- new MyThreadFactory(), new MyRejectedExecutionHandler());
- }
- /**
- * 向控制台打印日志, 自动输出时间, 线程等信息
- * @param info
- * @param arg
- */
- private void debug(String info, Object... arg) {
- long time = System.currentTimeMillis() - START_TIME;
- System.out.println(String.format(((double) time / 1000) + "-" + Thread.currentThread().getName() + "-" + info, arg));
- }
- /**
- * 测试实例化操作
- */
- private void newInstanceTest() {
- newTestPoolInstance();
- }
- /**
- * 测试提交操作, 提交 10 次任务
- */
- private void submitTest() {
- ThreadPoolExecutor threadPool = newTestPoolInstance();
- for (int i = 0; i < 10; i++) {
- threadPool.submit(new MyTask());
- }
- }
- public static void main(String[] args) {
- ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
- test.submitTest();
- }
- }
编译, 运行 =>
0.047-main - 创建线程 - Thread-0
0.064-main - 创建线程 - Thread-1
0.064-main - 创建线程 - Thread-2
0.064-main-BlockingQueue - offer, 参数为:[java.util.concurrent.FutureTask@4d7e1886]
0.064-main-BlockingQueue - offer 执行完毕, 返回值为: true
0.064-main-BlockingQueue - offer, 参数为:[java.util.concurrent.FutureTask@3cd1a2f1]
0.065-main-BlockingQueue - offer 执行完毕, 返回值为: true
0.065-main-BlockingQueue - offer, 参数为:[java.util.concurrent.FutureTask@2f0e140b]
0.065-main-BlockingQueue - offer 执行完毕, 返回值为: true
0.065-main-BlockingQueue - offer, 参数为:[java.util.concurrent.FutureTask@7440e464]
0.065-main-BlockingQueue - offer 执行完毕, 返回值为: true
0.065-main-BlockingQueue - offer, 参数为:[java.util.concurrent.FutureTask@49476842]
0.065-main-BlockingQueue - offer 执行完毕, 返回值为: true
0.065-main-BlockingQueue - offer, 参数为:[java.util.concurrent.FutureTask@78308db1]
0.065-main-BlockingQueue - offer 执行完毕, 返回值为: true
0.065-main-BlockingQueue - offer, 参数为:[java.util.concurrent.FutureTask@27c170f0]
0.065-main-BlockingQueue - offer 执行完毕, 返回值为: true
- 1.065-Thread-1-MyTask - Thread-1
- 1.065-Thread-0-MyTask - Thread-0
- 1.065-Thread-2-MyTask - Thread-2
1.065-Thread-1-BlockingQueue - take, 参数为: null
1.065-Thread-0-BlockingQueue - take, 参数为: null
1.065-Thread-2-BlockingQueue - take, 参数为: null
1.065-Thread-0-BlockingQueue - take 执行完毕, 返回值为: java.util.concurrent.FutureTask@3cd1a2f1
1.065-Thread-2-BlockingQueue - take 执行完毕, 返回值为: java.util.concurrent.FutureTask@2f0e140b
1.065-Thread-1-BlockingQueue - take 执行完毕, 返回值为: java.util.concurrent.FutureTask@4d7e1886
- 2.065-Thread-1-MyTask - Thread-1
- 2.065-Thread-2-MyTask - Thread-2
- 2.065-Thread-0-MyTask - Thread-0
2.065-Thread-1-BlockingQueue - take, 参数为: null
2.065-Thread-2-BlockingQueue - take, 参数为: null
2.065-Thread-0-BlockingQueue - take, 参数为: null
2.065-Thread-1-BlockingQueue - take 执行完毕, 返回值为: java.util.concurrent.FutureTask@7440e464
2.065-Thread-2-BlockingQueue - take 执行完毕, 返回值为: java.util.concurrent.FutureTask@49476842
2.065-Thread-0-BlockingQueue - take 执行完毕, 返回值为: java.util.concurrent.FutureTask@78308db1
- 3.066-Thread-1-MyTask - Thread-1
- 3.066-Thread-2-MyTask - Thread-2
- 3.066-Thread-0-MyTask - Thread-0
3.066-Thread-2-BlockingQueue - take, 参数为: null
3.066-Thread-1-BlockingQueue - take, 参数为: null
3.066-Thread-0-BlockingQueue - take, 参数为: null
3.066-Thread-2-BlockingQueue - take 执行完毕, 返回值为: java.util.concurrent.FutureTask@27c170f0
4.067-Thread-2-MyTask - Thread-2
4.067-Thread-2-BlockingQueue - take, 参数为: null
日志的格式是: 时间(秒)- 线程名 - 信息
从日志输出中, 我们可以获知:
当队列为空, 线程数少于核心线程数时, 提交任务会触发创建线程, 并立即执行任务
当核心线程均忙, 再提交的请求会被存储至阻塞队列, 等待线程空闲后执行队列中的任务
除主线程外, 始终只有三个工作线程
当队列为空, 工作线程还在运行的时候, 工作线程会因为阻塞队列的 take 方法阻塞(这一点由日志后几行可以看出, 只有调用日志, 没有调用完成的日志)
由此, 我产生一个疑问: 为什么始终只有三个线程? 我的设置不是核心池为 3, 最大池为 5 吗? 为什么只有三个线程在工作呢?
6 submit 任务
终于开始看源码了, 我们以 submit 为切入点, 探寻我们提交任务时, 线程池做了什么, submit 方法本身很简单, 就是将传入参数封装为 RunnableFuture 实例, 然后调用 execute 方法, 以下给出 submit 多个重载方法其中之一:
- public < T > Future < T > submit(Callable < T > task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture < T > ftask = newTaskFor(task);
- execute(ftask);
- return ftask;
- }
那么, 我们继续看 execute 的代码:
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- else if (!addWorker(command, false))
- reject(command);
- }
我们首先解释一下 addWorker 方法, 暂时我们只需要了解几件事情就可以理解 execute 代码了:
该方法用于新建一个工作线程
该方法线程安全
该方法第一个参数是新线程要执行的第一个任务, 第二个参数是是否新建核心线程
该方法如果新建线程成功, 则返回 true, 否则返回 false
那么我们回过头来理解 execute 代码:
为了帮助理解, 我根据代码逻辑画了一个流程图:
现在我明白了, 只有等待队列插入失败 (如达到容量上限等) 情况下, 才会创建非核心线程来处理任务, 也就是说, 我们使用的
LinkedBlockingQueue
队列来作为等待队列, 那是看不到非核心线程被创建的现象的
有心的读者可能注意到了, 整个过程没有加锁啊, 怎样保证并发安全呢? 我们观察这段代码, 其实没必要全部加锁, 只需要保证 addWorkerremove 和 workQueue.offer 三个方法的线程安全, 该方法就没必要加锁事实上, 在 addWorker 中是有对线程池状态的 recheck 的, 如果创建失败会返回 false
来源: https://juejin.im/post/5a7bd8b2f265da4e790ffa98