前言
最近被问到了线程池的相关问题. 于是准备开始写一些多线程相关的文章. 这篇将介绍一下线程池的基本使用.
Executors
Executors 是 concurrent 包下的一个类, 为我们提供了创建线程池的简便方法.
Executors 可以创建我们常用的四种线程池:
(1)newCachedThreadPool 创建一个可缓存线程池, 如果线程池长度超过处理需要, 可灵活回收空闲线程, 若无可回收, 则新建线程. 不设上限, 提交的任务将立即执行.
(2)newFixedThreadPool 创建一个定长线程池, 可控制线程最大并发数, 超出的线程会在队列中等待.
(3)newScheduledThreadPool 创建一个定长线程池, 支持定时及周期性任务执行.
(4)newSingleThreadExecutor 创建一个单线程化的线程池执行任务.
Executors 的坏处
正常来说, 我们不应该使用这种方式创建线程池, 应该使用 ThreadPoolExecutor 来创建线程池. Executors 创建的线程池也是调用的 ThreadPoolExcutor 的构造函数. 通过原来可以看出.
我们也看到了这里面的 LinkedBlockingQueue 并没有指定队列的大小是一个无界队列, 这样可能会造成 oom. 所以我们要使用 ThreadPoolExecutor 这种方式.
ThreadPoolExecutor
通过源码看到 ThreadPoolExecutor 比较全的构造函数如下:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize <0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
分别解释一下参数的意义
corePoolSize: 线程池长期维持的线程数, 即使线程处于 Idle 状态, 也不会回收.
maximumPoolSize: 线程数的上限
keepAliveTime: 空闲的时间, 超过这个空闲时间, 线程将被回收
unit: 空闲时间的时间单位
workQueue: 任务的排队队列, 当线程都运行的时候, 有空的线程将从队列汇总进行拿取
threadFactroy: 当核心线程小于满线程的时候, 又需要多加线程, 则需要从工厂中获取线程
handler: 拒绝策略, 当线程过多的时候的策略
线程池针对于任务的执行顺序
首先任务过来之后, 看看 corePoolSize 是否有空闲的, 有的话就执行. 没有的话, 放入任务队列里面. 然后任务队列会通知线程工厂, 赶紧造几个线程, 来执行. 当任务超过了最大的线程数, 就执行拒绝策略, 拒绝执行.
submit 方法
线程池建立完毕之后, 我们就需要往线程池提交任务. 通过线程池的 submit 方法即可.
submit 方法接收两种 Runable 和 Callable.
区别如下:
Runable 是实现该接口的 run 方法, callable 是实现接口的 call 方法.
callable 允许使用返回值.
callable 允许抛出异常.
提交任务的方式
Futuresubmit(Callabletask): 这种方式可以拿到返回的结果.
void execute(Runnable command): 这种方式拿不到.
Future<?> submit(Runnable task): 这种方式可以 get, 但是永远是 null.
blockqueue 的限制
我们在创建线程池的时候, 如果使用 Executors. 创建的是无界队列, 容易造成 oom. 所以我们要自己执行 queue 的大小.
BlockingQueue queue = new ArrayBlockingQueue<>(512)
拒绝策略
当任务队列的 queue 满了的时候, 在提交任务, 就要触发拒绝策略. 队列中默认的拒绝策略是 AbortPolicy. 是直接抛出异常的一种策略.
如果是想实现自定义的策略, 可以实现 RejectedExecutionHandler 接口.
线程池提供了如下的几种策略供选择.
AbortPolicy: 默认策略, 抛出 RejectedExecutionException
DiscardPolicy: 忽略当前提交的任务
DiscardOldestPolicy: 丢弃任务队列中最老的任务, 给新任务腾出地方
CallerRunsPolicy: 由提交任务者执行这个任务
- ExecutorService executorService = new ThreadPoolExecutor(2, 2,
- 0, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(512),
- new ThreadPoolExecutor.DiscardPolicy());
捕捉异常
如之前所说 Callable 接口的实现, 可以获取到结果和异常. 通过返回的 Future 的 get 方法即可拿到.
- ExecutorService executorService = Executors.newFixedThreadPool(5);
- Future<Object> future = executorService.submit(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- throw new RuntimeException("exception");// 该异常会在调用 Future.get() 时传递给调用者
- }
- });
- try {
- Object result = future.get();
- } catch (InterruptedException e) {
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
正确构造线程池的方式
- int poolSize = Runtime.getRuntime().availableProcessors() * 2;
- BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
- RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();
- executorService = new ThreadPoolExecutor(poolSize, poolSize,
- 0, TimeUnit.SECONDS,
- queue,
- policy);
获取单个结果
通过 submit 提交一个任务后, 可以获取到一个 future, 调用 get 方法会阻塞并等待执行结果. get(long timeout, TimeUnit unit) 可以指定等待的超时时间.
获取多个结果
可以使用循环依次调用, 也可以使用 ExecutorCompletionService. 该类的 take 方式, 会阻塞等待某一任务完成. 向 CompletionService 批量提交任务后, 只需调用相同次数的 CompletionService.take() 方法, 就能获取所有任务的执行结果, 获取顺序是任意的, 取决于任务的完成顺序.
- void solve(Executor executor, Collection<Callable<Result>> solvers)
- throws InterruptedException, ExecutionException {
- CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor);// 构造器
- for (Callable<Result> s : solvers)// 提交所有任务
- ecs.submit(s);
- int n = solvers.size();
- for (int i = 0; i <n; ++i) {// 获取每一个完成的任务
- Result r = ecs.take().get();
- if (r != null)
- use(r);
- }
- }
这个类是对线程池的一个包装, 包装完后, 听过他进行 submit 和 take.
单个任务超时
Future.get(long timeout, TimeUnit unit). 方法可以指定等待的超时时间, 超时未完成会抛出 TimeoutException.
多个任务超时
等待多个任务完成, 并设置最大等待时间, 可以通过 CountDownLatch 完成:
- public void testLatch(ExecutorService executorService, List<Runnable> tasks)
- throws InterruptedException{
- CountDownLatch latch = new CountDownLatch(tasks.size());
- for(Runnable r : tasks){
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try{
- r.run();
- }finally {
- latch.countDown();// countDown
- }
- }
- });
- }
- latch.await(10, TimeUnit.SECONDS); // 指定超时时间
- }
await 是总的时间, 即使 100 个任务, 需要跑 20 分钟. 我 10s 超时了 也停止了.
来源: https://www.cnblogs.com/jichi/p/12560235.html