背景
公司业务性能优化, 使用 java 自带的 Executors.newFixedThreadPool() 方法生成线程池. 但是其内部定义的 LinkedBlockingQueue 容量是 Integer.MAX_VALUE. 考虑到如果数据库中待处理数据量很大有可能会在短时间内往 LinkedBlockingQueue 中填充很多数据, 导致内存溢出. 于是看了一下线程池这块的源码, 并在此记录.
类图
Executor 是一个顶层接口, 在它里面只声明了一个方法 execute(Runnable), 返回值为 void, 参数为 Runnable 类型, 从字面意思可以理解, 就是用来执行传进去的任务的;
ExecutorService 接口继承了 Executor 接口, 并声明了一些方法: submit,invokeAll,invokeAny 以及 shutDown 等
抽象类 AbstractExecutorService 实现了 ExecutorService 接口, 基本实现了 ExecutorService 中声明的所有方法; submit() 方法
ThreadPoolExecutor 继承了类 AbstractExecutorService. 实现了 execute(Runnable) 方法.
Executors 提供的集中工厂方法都是调用的 ThreadPoolExecutor 的构造方法. 因为这个构造方法参数比较多 所以提供了几个经典的实现.
- ExecutorService newCachedThreadPool = Executors.newFixedThreadPool();
- ExecutorService newCachedThreadPool = Executors.newSingleThreadExecutor();
- ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
- ExecutorService newCachedThreadPool = Executors.newScheduledThreadPool();
本篇违章主要包括以下几点内容. 这也是解决背景中提到的问题的主要历程.
1.ThreadPoolExecutor 构造方法
2.ExecutorService submit() 方法的实现
2.Executor execute() 方法的实现
3.reject() 拒绝策略
ThreadPoolExecutor 构造方法
构造方法中赋值的成员标量:
- // 构造方法中用到的成员变量
- private volatile int corePoolSize; // 核心池的大小 (即线程池中的线程数目大于这个参数时, 提交的任务会被放进任务缓存队列)
- private volatile int maximumPoolSize; // 线程池最大能容忍的线程数
- private volatile long keepAliveTime; // 线程空闲之后存货时间 (线程数量大于 corePoolSize 之后)
- private final BlockingQueue<Runnable> workQueue; // 任务缓存队列, 用来存放等待执行的任务
- private volatile ThreadFactory threadFactory; // 线程工厂, 用来创建线程
- private volatile RejectedExecutionHandler handler; // 任务拒绝策略
通过代码可以知道 Executors 提供的集中工厂方法实际都是调用的同一个 ThreadPoolExecutor 的构造方法. 当然我们也可以通过自己调用 ThreadPoolExecutor 构造方法 自己设置参数 从而获得很贴合我们业务的线程池.
AbstractExecutorService submit() 方法
- /**
- * @throws RejectedExecutionException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- public <T> Future<T> submit(Callable<T> task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task);
- execute(ftask);
- return ftask;
- }
其实是调用了 execute() 方法, execute() 方法 由 ThreadPoolExecutor 类实现.
ThreadPoolExecutor execute() 方法
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // 29 位
- private static final int COUNT_BITS = Integer.SIZE - 3;
- // 0001 1111 1111 1111 1111 1111 1111 1111
- private static final int CAPACITY = (1 <<COUNT_BITS) - 1;
- // runState is stored in the high-order bits
- private static final int RUNNING = -1 << COUNT_BITS;
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- private static final int STOP = 1 << COUNT_BITS;
- private static final int TIDYING = 2 << COUNT_BITS;
- private static final int TERMINATED = 3 << COUNT_BITS;
- // Packing and unpacking ctl
- // 高三位 代表 状态
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- // 低三位 代表 数量
- private static int workerCountOf(int c) { return c & CAPACITY; }
- // 把状态和数量两个值 揉在一起
- // private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- private static int ctlOf(int rs, int wc) { return rs | wc; }
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- // 获取到当前有效的线程数和线程池的状态
- int c = ctl.get();
- // 1. 获取当前正在运行线程数是否小于核心线程池, 是则新创建一个线程执行任务, 否则将任务放到任务队列中
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // 2. 当前核心线程池中全部线程都在运行 workerCountOf(c)>= corePoolSize, 所以此时将线程放到任务队列中
- // 线程池是否处于运行状态, 且是否任务插入任务队列成功. 注意这块 && 是做了优化如果前面条件失败后面语句不会处理
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- // 在此检查线程池是否处于运行状态, 如果不是则使刚刚的任务出队. 和上面一样 && 是做了优化如果前面条件失败后面语句不会处理
- if (! isRunning(recheck) && remove(command))
- reject(command);
- // 如果没有执行的线程, 就再开启一个线程 (有可能没有核心线程)
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 3. 插入队列不成功 offer() 方法失败是因为队列满了, 此时就新创建线程去执行任务, 创建失败抛出异常
- else if (!addWorker(command, false))
- reject(command);
- }
- // CAS 修改 clt 的值 + 1, 成功退出 cas 循环, 失败继续
- if (compareAndIncrementWorkerCount(c))
- break retry;
- // 将新建的线程加入到线程池中
- workers.add(w);
- int s = workers.size();
- // 修正 largestPoolSize 的值
- if (s> largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
addWorker() 方法 总结起来就两部分
1.CAS + 失败重试操作来将线程数加 1
2. 新建一个线程并启用.
RejectedExecutionHandler 拒绝策略
java 内置的四种拒绝策略.
- public static class AbortPolicy implements RejectedExecutionHandler // 抛出 java.util.concurrent.RejectedExecutionException 异常
- public static class CallerRunsPolicy implements RejectedExecutionHandler // 直接在 execute 方法的调用线程中运行被拒绝的任务. 如果执行程序已关闭, 则会丢弃该任务
- public static class DiscardPolicy implements RejectedExecutionHandler // 不做任何处理 直接丢弃
- public static class DiscardOldestPolicy implements RejectedExecutionHandler // 丢弃老的
自定义拒绝策略:
- new RejectedExecutionHandler() {
- // 自定义拒绝策略
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- try {
- // 如果 LinkedBlockingQueue 存满了, 阻塞等待有空间后再加入元素.(put 方法是阻塞的)
- LOGGER.info("LinkedBlockingQueue has been full");
- // put() 方法是阻塞的, 如果队列没有空间会一直等待.
- executor.getQueue().put(r);
- LOGGER.info("thread has been put in");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
总结一点: 当用 java 内置的一些工具的时候, 如果有不理解的一定要 深入去看源码. 从根本上找解决思路.
来源: https://www.cnblogs.com/Vdiao/p/10426982.html