1. 简介
使用线程池可以避免线程的频繁创建以及销毁.
JAVA 中提供的用于实现线程池的 API:
Executor,ExecutorService,AbstractExecutorService,ThreadPoolExecutor,ForkJoinPool 都位于 java.util.concurrent 包下.
*ThreadPoolExecutor,ForkJoinPool 为线程池的实现类.
- 2.Executor
- public interface Executor {
- /**
- * 向线程池提交一个任务, 交由线程池去执行
- */
- void execute(Runnable command);
- }
* 该接口声明了 execute(Runnable command) 方法, 负责向线程池中提交一个任务.
3.ExecutorService 接口
- public interface ExecutorService extends Executor {
- /**
- * 关闭线程池 (直到队列中的任务被执行完才会进行关闭)
- */
- void shutdown();
- /**
- * 立刻关闭线程池 (不执行队列中的任务, 且尝试中断当前执行的任务)
- */
- List<Runnable> shutdownNow();
- /**
- * 判断线程池是否处于 shutdown 状态.
- */
- boolean isShutdown();
- /**
- * 判断线程池是否处于 terminated 状态.
- */
- boolean isTerminated();
- /**
- * 若在指定时间内线程池处于 terminated 状态则立即返回 true, 否则超过时间后仍未为 terminated 状态则返回 false.
- */
- boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
- /**
- * 向线程池提交一个任务并返回包含指定类型的 Future(根据 Callable 的泛型)
- */
- <T> Future<T> submit(Callable<T> task);
- /**
- * 向线程池提交一个任务并指定任务执行结果的类型, 返回包含指定类型的 Future.
- */
- <T> Future<T> submit(Runnable task, T result);
- /**
- * 向线程池提交一个任务并返回未知类型的 Future.
- */
- Future<?> submit(Runnable task);
- /**
- * 向线程池提交多个任务并返回指定类型的 Future 列表.
- */
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
- /**
- * 向线程池提交多个任务并返回指定类型的 Future 列表, 如果在指定时间内没有执行完毕则直接返回.
- */
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
- /**
- * 向线程池提交多个任务, 当任意一个任务执行完毕后返回指定类型的 Future.
- */
- <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
- /**
- * 向线程池提交多个任务, 在指定时间内, 当任意一个任务执行完毕后返回指定类型的 Future, 若超时则抛出异常.
- */
- <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
- }
- public interface Future<V> {
- /**
- * 中断任务的执行
- */
- boolean cancel(boolean mayInterruptIfRunning);
- /**
- * 判断任务是否中断成功
- */
- boolean isCancelled();
- /**
- * 判断任务是否执行完成
- */
- boolean isDone();
- /**
- * 获取任务的执行结果直到任务执行完毕 (阻塞线程)
- */
- V get() throws InterruptedException, ExecutionException;
- /**
- * 获取任务的执行结果, 若在指定时间内任务仍然没有执行完毕则抛出 TimeoutException
- */
- V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
- }
*execute() 方法不能获取任务的执行结果, 而 submit() 方法能够根据返回的 Future 实例获取任务的执行结果.
4.ThreadPoolExecutor
ThreadPoolExecutor 声明的核心属性
corePoolSize: 线程池中核心线程的数量.
maximumPoolSize: 线程池中最大线程数.
keepAliveTime: 线程的空闲时间.
unit: 修饰线程空闲时间的单位.
workQueue: 任务队列.
threadFactory: 线程工厂, 用于创建线程.
handler: 当队列已满且当前线程数已达到所允许的最大值时的处理策略.
* 线程池中的线程包括核心线程以及普通线程, 核心线程一旦创建后直到线程池被关闭前都就不会销毁, 而普通线程会因为到达空闲时间而被销毁.
构造方法:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
BlockingQueue 的类型
BlockingQueue 提供了 ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue 等实现类.
1.ArrayBlockingQueue: 使用顺序表的结构进行存储, 在使用时需要指定其长度, 支持公平锁 / 非公平锁进行操作.
2.LinkedBlockingQueue: 使用链表的结构进行存储, 在使用时不需要指定其长度, 队列的长度为 Integer.MAX_VALUE.
3.SynchronousQueue: 一个不存储元素的队列, 每一个 put 操作必须等待 take 操作, 否则不能添加元素, 支持公平锁和非公平锁.
* 这些实现类在进行入队和出队操作时都会进行加锁, 以保证在并发访问时数据的完整性.
队列已满且线程数已达到所允许的最大值时的处理策略
RejectedExecutionHandler 提供了 AbortPolicy,DiscardPolicy,DiscardOlderstPolicy,CallerUnsPolicy 四个策略, 这四个策略都是 ThreadPoolExecutor 的静态内部类.
1.AbortPolicy: 放弃任务并抛出 RejectedExecutionException 异常.
2.DiscardPolicy: 放弃任务但不抛出异常.
3.DiscardOlderstPolicy: 放弃队头中的任务, 然后重新尝试执行新任务.
4.CallerUnsPolicy: 由调用线程来处理该任务.
线程池的状态
- private static final int RUNNING = -1;
- private static final int SHUTDOWN = 0;
- private static final int STOP = 1;
- private static final int TIDYING = 2;
- private static final int TERMINATED = 3;
1.RUNING: 线程池处于运行状态, 此时可以接受新的任务请求, 并且执行队列中的任务.
2.SHUTDOWN: 线程池处于关闭状态, 此时不接受新的任务请求, 但会继续执行队列中的任务.
3.STOP: 线程池处于禁用状态, 此时不接受新的任务请求, 并且不执行队列中的任务.
4.TIDYING: 线程池处于整理状态, 此时没有正在执行的任务
5.TERMINATED : 线程池处于终止状态.
线程池状态变化过程
1. 当线程池创建后处于 RUNNING 状态.
2.1 若此时调用了 shutdown() 方法, 那么线程池将处于 SHUTDOWN 状态, 不接受新的任务请求, 但会继续执行队列中的任务, 当队列中的任务为空且没有正在执行的任务时, 线程池的状态为 TIDYING.
2.2 若此时调用了 shutdownNow() 方法, 那么线程池将处于 STOP 状态, 不接受新的任务请求且不执行队列中的任务, 此时线程池的状态为 TIDYING.
3. 当线程池的状态为 TIDYING 时, 当 terminated() 方法处理完毕后, 线程池将处于 TRRMINATED 状态.
任务的执行流程
1. 当调用了 execute() 或者 submit() 方法向线程池提交任务后, 首先判断当前线程池中的线程个数是否大于核心线程数.
2. 如果当前线程池的线程个数小于核心线程数, 则创建一个核心线程来处理任务.
3. 如果当前线程池的线程个数大于核心线程数, 则将任务放入到队列中, 如果放入队列成功, 那么该任务将等待被空闲的线程处理, 如果放入队列失败 (队满), 则判断当前线程池中的线程个数是否达到所允许的线程最大值, 若没达到则创建一个普通线程去处理任务, 否则根据预定义的处理策略去进行处理.
5.Executors 工具类
JAVA 中提供了 Executors 工具类, 用于直接创建 Executor.
- CacheThreadPool
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
- }
CacheThreadPool 创建的都是普通线程 (其核心线程数为 0), 线程池最大的线程数为 Integer.MAX_VALUE, 线程的空闲时间为 60 秒, 此方式适合大量耗时短的任务, 不适合大量耗时长的任务.
* 由于创建的都是普通线程, 且空闲时间为 60 秒, 则仍有可能会频繁的创建线程.
- FixedThreadPool
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
- }
FixedThreadPool 创建的都是核心线程, 其线程个数由构建方法的入参决定, 线程不会因为空闲时间而被销毁, 适合预知任务数量的业务.
- SingleThreadExector
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,new LinkedBlockingQueue<Runnable>()));
- }
SingleThreadExector 使用一个核心线程来处理任务.
- ScheduledThreadPool
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
- return new ScheduledThreadPoolExecutor(corePoolSize);
- }
*ScheduledThreadPool 支持定时执行任务以及固定间隔执行任务.
- SingleThreadScheduledExecutor
- public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
- return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
- }
*SingleThreadScheduledExecutor 支持一个线程的定时执行任务以及固定间隔执行任务.
- public interface ScheduledExecutorService extends ExecutorService {
- /**
- * 在指定的延迟时间到达后执行任务一次
- */
- public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
- /**
- * 在指定的延迟时间到达后执行任务一次
- */
- public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
- /**
- * 在指定的初始化延迟时间到达后执行任务一次, 往后每隔 period 时间执行任务一次.
- */
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
- /**
- * 在指定的初始化延迟时间到达后执行任务一次, 往后每次任务执行完毕后相隔 delay 时间执行任务一次.
- */
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
- }
- WorkStealingPool
- public static ExecutorService newWorkStealingPool(int parallelism) {
- return new ForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
- }
WorkStealingPool 创建一个并行级别的线程池, 同一时刻最多只能有指定个数个线程正在执行任务, 创建时直接指定最多允许并行执行的线程个数即可 (如果不传则使用 CPU 的核数)
newWorkStealingPool 方法内部返回一个 ForkJoinPool 实例, ForkJoinPool 是 JAVA7 新提供的线程池, 同样继承 AbstactExecutorService.
来源: https://www.cnblogs.com/funyoung/p/10530986.html