只有当任务都是同类型并且相互独立时, 线程池的性能才能达到最佳. 如果将运行时间较长的与运行时间较短的任务混合在一起, 那么除非线程池很大, 否则将可能造成拥塞, 如果提交的任务依赖于其他任务, 那么除非线程池无线大, 否则将可能造成死锁.
例如饥饿死锁: 线程池中的任务需要无限等待一些必须由池中其他任务才能提供的资源或条件.
ThreadPoolExecutor 的通用构造函数:(在调用完构造函数之后可以继续定制 ThreadPoolExecutor)
- public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
- TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,
- RejectedExecutionHandler handler){
- //...
- }
饱和策略:
ThreadPoolExecutor 允许提供一个 BlockingQueue 来保存等待执行的任务.
当有界队列被填满后, 饱和策略开始发挥作用. 可以通过调用 setRejectedExecutionHandler 来修改.
中止是默认的饱和策略, 该策略将抛出未检查的 RejectedExecutionException, 调用者可以捕获这个异常, 然后根据需求编写自己的处理代码.
调用者运行策略实现了一种调节机制, 该策略既不会抛弃任务, 也不会抛出异常, 而是将某些任务回退到调用者, 从而降低新任务的流量.
例如对于 webServer, 当线程池中的所有线程都被占用, 并且工作队列被填满后, 下一个任务在调用 execute 时在主线程中执行.
由于执行任务需要一定的时间, 因此主线程至少在一段时间内不能提交任何任务, 从而使得工作者线程有时间来处理完正在执行的任务.
在这期间, 主线程不会调用 accept, 因此到达的请求将被保存在 TCP 层的队列中而不是在应用程序的队列中, 如果持续过载, 那么 TCP 层最终发现它的请求队列被填满, 同样会开始抛弃请求.
因此当服务器过载时, 这种过载会逐渐向外蔓延开来 --- 从线程池到工作队列到应用程序再到 TCP 层, 最终到达客户端, 导致服务器在高负载下实现一种平缓的性能降低.
exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
当工作队列被填满后, 没有预定于的饱和策略来阻塞 execute. 而通过 Semaphore 来现在任务的到达率, 可以实现.
- /**
- * 设置信号量的上界设置为线程池的大小加上可排队任务的数量, 控制正在执行和等待执行的任务数量.
- */
- public class BoundedExecutor {
- private final Executor exec;
- private final Semaphore semaphore;
- public BoundedExecutor(Executor exec,int bound){
- this.exec = exec;
- this.semaphore = new Semaphore(bound);
- }
- public void submitTask(final Runnable task) throws InterruptedException{
- semaphore.acquire();
- try{
- exec.execute(new Runnable(){
- public void run(){
- try{
- task.run();
- }finally{
- semaphore.release();
- }
- }
- });
- }catch(RejectedExecutionException e){
- semaphore.release();
- }
- }
- }
线程工厂
线程池配置信息中可以定制线程工厂, 在 ThreadFactory 中只定义了一个方法 newThread, 每当线程池需要创建一个新线程时都会调用这个方法.
- public interface ThreadFactory{
- Thread newThread(Runnable r);
- }
- // 示例: 将一个特定于线程池的名字传递给 MyThread 的构造函数, 从而可以再线程转储和错误日志信息中区分来自不同线程池的线程.
- public class MyThreadFactory implements ThreadFactory{
- private final String poolName;
- public MyThreadFactory(String poolName){
- this.poolName = poolName;
- }
- public Thread newThread(Runnable runnable){
- return new MyThread(runnable,poolName);
- }
- }
- // 示例: 为线程指定名字, 设置自定义 UncaughtExceptionHandler 向 Logger 中写入信息及维护一些统计信息以及在线程被创建或者终止时把调试消息写入日志.
- public class MyThread extends Thread{
- public static final String default_name = "myThread";
- private static volatile boolean debugLifecycle = false;
- private static final AtomicInteger created = new AtomicInteger();
- private static final AtomicInteger alive = new AtomicInteger();
- private static final Logger log = Logger.getAnonymousLogger();
- public MyThread(Runnable runnable){
- this(runnable,default_name);
- }
- public MyThread(Runnable runnable, String defaultName) {
- super(runnable,defaultName + "-" + created.incrementAndGet());
- setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- log.log(Level.SEVERE,"uncaught in thread" + t.getName(), e);
- }
- });
- }
- public void run(){
- boolean debug = debugLifecycle;
- if(debug){
- log.log(Level.FINE,"created" + getName());
- }
- try{
- alive.incrementAndGet();
- super.run();
- }finally{
- alive.decrementAndGet();
- if(debug){
- log.log(Level.FINE,"Exiting" + getName());
- }
- }
- }
- }
扩展 ThreadPoolExecutor
在线程池完成关闭操作时调用 terminated, 也就是在所有任务都已经完成并且所有工作者线程也已经关闭后. terminated 可以用来释放 Executor 在其生命周期里分配的各种资源, 此外还可以执行发送通知, 记录日志或者收集 finalize 统计信息等操作.
示例: 给线程池添加统计信息
- /**
- * TimingThreadPool 中给出了一个自定义的线程池, 通过 beforeExecute,afterExecute,terminated 等方法来添加日志记录和统计信息收集.
- * 为了测量任务的运行时间, beforeExecute 必须记录开始时间并把它保存到一个 afterExecute 可用访问的地方.
- * 因为这些方法将在执行任务的线程中调用, 因此 beforeExecute 可以把值保存到一个 ThreadLocal 变量中. 然后由 afterExecute 来取.
- * 在 TimingThreadPool 中使用了两个 AtomicLong 变量, 分别用于记录已处理的任务和总的处理时间, 并通过包含平均任务时间的日志消息.
- */
- public class TimingThreadPool extends ThreadPoolExecutor{
- public TimingThreadPool(int corePoolSize, int maximumPoolSize,
- long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
- }
- private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
- private final Logger log = Logger.getLogger("TimingThreadPool");
- private final AtomicLong numTasks = new AtomicLong();
- private final AtomicLong totalTime = new AtomicLong();
- protected void beforeExecute(Thread t,Runnable r){
- super.beforeExecute(t, r);
- log.fine(String.format("Thread %s: start %s", t,r));
- startTime.set(System.nanoTime());
- }
- protected void afterExecute(Throwable t,Runnable r){
- try{
- long endTime = System.nanoTime();
- long taskTime = endTime - startTime.get();
- numTasks.incrementAndGet();
- totalTime.addAndGet(taskTime);
- log.fine(String.format("Thread %s: end %s, time=%dns", t,r,taskTime));
- }finally{
- super.afterExecute(r, t);
- }
- }
- protected void terminated(){
- try{
- log.info(String.format("Terminated: avg time=%dns", totalTime.get()/numTasks.get()));
- }finally{
- super.terminated();
- }
- }
- }
- # 笔记内容来自 《java 并发编程实战》
来源: https://www.cnblogs.com/shanhm1991/p/9899720.html