AbortPolicy: 直接抛出异常 (默认策略), 就算线程池有空闲了, 后面的线程也无法在运行了, 要想后面的线程可以运行, 要捕获异常信息.
CallerRunsPolicy: 该策略直接在调用者线程中运行当前被丢弃的任务. 显然这样做不会真的丢弃任务, 但是任务提交线程的性能极有可能会急剧下降.
DiscardOldestPolicy: 将丢弃最老的一个请求, 也就是即将被执行的一个任务, 并尝试再次提交当前任务.
DiscardPolicy: 默默丢弃无法处理的任务, 不予任何处理. 如果允许任务丢失, 这可能是最好的一种解决方案. 在线程池不空闲的时候, 提交的任务都将丢弃, 当有空闲的线程时提交的任务会执行.
下面是 jdk 的拒绝策略源码 - intsmaze
- public static class CallerRunsPolicy implements RejectedExecutionHandler {
- public CallerRunsPolicy() { }
- /**
- * 直接在调用者线程中运行当前被丢弃的任务
- */
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- if (!e.isShutdown()) {
- r.run();
- }
- }
- }
- public static class AbortPolicy implements RejectedExecutionHandler {
- public AbortPolicy() { }
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- throw new RejectedExecutionException("Task" + r.toString() +
- "rejected from" +
- e.toString());
- }
- }
- public static class DiscardPolicy implements RejectedExecutionHandler {
- public DiscardPolicy() { }
- /**
- * Does nothing, which has the effect of discarding task r.
- */
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- }
- }
- public static class DiscardOldestPolicy implements RejectedExecutionHandler {
- public DiscardOldestPolicy() { }
- /**
- * 将丢弃最老的一个请求, 也就是即将被执行的一个任务, 并尝试再次提交当前任务.
- */
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- if (!e.isShutdown()) {
- e.getQueue().poll();
- e.execute(r);
- }
- }
- }
总结: AbortPolicy 策略下, 我们要 catch 异常, 这样我们可以捕获到哪些任务被丢弃了. 如果采用其他的策略, 丢弃的任务无法定位的, 只能通过下列程序中 es.submit(new MyTask(i)); 任务之前打印该任务, 运行任务的 run() 逻辑是, 在打印任务信息, 两处日志比对来定位哪些任务被丢弃了.
- public class MyTask implements Runnable
- {
- private int number;
- public MyTask(int number) {
- super();
- this.number = number;
- }
- public void run() {
- System.out.println(System.currentTimeMillis()+"thread id:"+Thread.currentThread().getId()+"==="+number);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- public static void main(String[] args) {
- // ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,
- // new ArrayBlockingQueue<Runnable>(1), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
- // ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,
- // new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
- // ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,
- // new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());
- ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,
- new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy());
- for(int i=0;i<10000;i++)
- {
- try {
- System.out.println(i);
- es.submit(new MyTask(i));
- Thread.sleep(100);
- } catch (Exception e) {
- e.printStackTrace();
- System.out.println("------------------------"+i);
- }
- }
- }
线程池执行逻辑源码解析 - intsmaze
- public Future<?> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<Void> ftask = newTaskFor(task, null);
- execute(ftask);
- return ftask;
- }
- /**
- * Executes the given task sometime in the future. The task
- * may execute in a new thread or in an existing pooled thread.
- *
- * If the task cannot be submitted for execution, either because this
- * executor has been shutdown or because its capacity has been reached,
- * the task is handled by the current {@code RejectedExecutionHandler}.
- *
- * @param command the task to execute
- * @throws RejectedExecutionException at discretion of
- * {@code RejectedExecutionHandler}, if the task
- * cannot be accepted for execution
- * @throws NullPointerException if {@code command} is null
- */
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- /*
- * Proceed in 3 steps:
- *
- * 1. If fewer than corePoolSize threads are running, try to
- * start a new thread with the given command as its first
- * task. The call to addWorker atomically checks runState and
- * workerCount, and so prevents false alarms that would add
- * threads when it shouldn't, by returning false.
- * 如果少于 corePoolSize 线程正在运行, 首先尝试用给定的命令启动一个新的线程任务.
- 自动调用 addWorker 检查 runState 和 workerCount,
- * 2. If a task can be successfully queued, then we still need
- * to double-check whether we should have added a thread
- * (because existing ones died since last checking) or that
- * the pool shut down since entry into this method. So we
- * recheck state and if necessary roll back the enqueuing if
- * stopped, or start a new thread if there are none.
- * 如果任务可以成功排队, 那么我们仍然需要
- 仔细检查我们是否应该添加一个线程
- (因为现有的自从上次检查后死亡) 或者那个
- 自进入该方法以来, 该池关闭. 所以我们
- 重新检查状态, 如果有必要的话回滚队列
- 停止, 或者如果没有的话就开始一个新的线程.
- * 3. If we cannot queue task, then we try to add a new
- * thread. If it fails, we know we are shut down or saturated
- * and so reject the task.
- */
- int c = ctl.get();
- if (workerCountOf(c) <corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);// 队列满了, 执行拒绝策略
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- else if (!addWorker(command, false))
- reject(command);
- }
- final void reject(Runnable command) {
- handler.rejectedExecution(command, this);// 这里就是调用我们传入的拒绝策略对象的方法
- }
- /**
- * Dispatch an uncaught exception to the handler. This method is
- * intended to be called only by the JVM.
- */
- private void dispatchUncaughtException(Throwable e) {
- getUncaughtExceptionHandler().uncaughtException(this, e);
- }
jdk 的线程池实现类 - intsmaze
newFixedThreadPoo-intsmaze
任务队列为 LinkedBlockingQueue 中 (长度无限), 线程数量和最大线程数量相同. 功能参考前面的任务队列总结.
- ExecutorService es=Executors.newFixedThreadPool(5);// 参数同时指定线程池中线程数量为 5, 最大线程数量为 5public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
- }
- newSingleThreadExecutor-intsmaze
任务队列
LinkedBlockingQueue
中 (长度无限),
线程数量和最大线程数量均为 1.
- ExecutorService es=Executors.newSingleThreadExecutor();// 线程池中线程数量和最大线程数量均为 1.public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>()));
- }
- newCachedThreadPool-intsmaze
任务队列为 SynchronousQueue, 线程数量为 0, 最大线程数量为 Integer.MAX_VALUE, 所以只要有任务没有空闲线程就会创建就新线程.
- ExecutorService es=Executors.newCachedThreadPool();
- // 指定线程池中线程数量为 0, 最大线程数量为 Integer.MAX_VALUE, 任务队列为 SynchronousQueuepublic static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- }
newScheduledThreadPool- - 定时线程 - intsmaze
任务队列为 new DelayedWorkQueue(), 返回的对象在 ExecutorService 接口上扩展了在指定时间执行某认为的功能, 在某个固定的延时之后执行或周期性执行某个任务.
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
- return new ScheduledThreadPoolExecutor(corePoolSize);
- }
- public ScheduledThreadPoolExecutor(int corePoolSize) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue());
- }
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
- Executors.defaultThreadFactory(), defaultHandler);
- }
newSingleThreadScheduledExecutor- - 定时线程 - intsmaze
相当于 newScheduledThreadPool(
- int
- corePoolSize)
中 corePoolSize 设置为 1.
ScheduledExecutorService es=Executors.newSingleThreadScheduledExecutor();
延迟线程池
- class MyScheduledTask implements Runnable
- {
- private String tname;
- public MyScheduledTask(String tname)
- {
- this.tname=tname;
- }
- public void run()
- {
- System.out.println(tname+"任务时延 2 秒执行!!!");
- }
- }
- public class intsmaze
- {
- public static void main(String[] args)
- {
- ScheduledExecutorService scheduledThreadPool
- =Executors.newScheduledThreadPool(2);
- MyScheduledTask mt1=new MyScheduledTask("MT1");
- scheduledThreadPool.schedule(mt1,2,TimeUnit.SECONDS);
- }
- }
- newWorkStealingPool
java8 新增连接池 - intsmaze
- public static ExecutorService newWorkStealingPool(int parallelism) {
- return new ForkJoinPool
- (parallelism,
- ForkJoinPool.defaultForkJoinWorkerThreadFactory,
- null, true);
- }// 创建指定数量的线程池来执行给定的并行级别, 还会使用多个队列减少竞争
- public static ExecutorService newWorkStealingPool() {
- return new ForkJoinPool
- (Runtime.getRuntime().availableProcessors(),
- ForkJoinPool.defaultForkJoinWorkerThreadFactory,
- null, true);
- }// 前一个方法的简化, 如果当前机器有 4 个 CPU, 则目标的并行级别被设置为 4.
关闭线程池 (很少使用, 除了切换数据源时需要控制)-intsmaze
希望程序执行完所有任务后退出, 调用 ExecutorService 接口中的 shutdown(),shutdownNow() 方法.
用完一个线程池后, 应该调用该线程池的 shutdown 方法, 将启动线程池的关闭序列. 调用 shutdown 方法后, 线程池不在接收新的任务, 但是会将以前所有已经提交的任务执行完. 当线程池中的所有任务都执行完后, 线程池中的所有线程都会死亡; shutdownNow 方法会试图停止所有正在执行的活动任务, 暂停处理正在等待的任务, 并返回等待执行的任务列表.
线程池优化 - intsmaze
一般来说确定线程池的大小需要考虑 CPU 数量, 内存大小, JDBC 连接等因素. 在java 并发编程实践一书中给出了一个估算线程池大小的经验公式:
Ncpu=CPU 的数量
Ucpu = 目标 CPU 的使用率, 0<=Ucpu<=1
W/C = 等待时间与计算时间的比率
为保持处理器达到期望的使用率, 最优的线程池的大小等于:
Nthreads=Ncpu*Ucpu*(1+W/C)
在 java 中, 可以通过
Runtime.getRuntime().availableProcessors()
取得可以 CPU 数量.
来源: https://www.cnblogs.com/intsmaze/p/9432199.html