本文主要整理博主遇到的 Java 多线程的相关知识点, 适合速记, 故命名为 "小抄集". 本文没有特别重点, 每一项针对一个多线程知识做一个概要性总结, 也有一些会带一点例子, 习题方便理解和记忆.
27. ConcurrentHashMap
ConcurrentHashMap 是线程安全的 HashMap, 内部采用分段锁来实现, 默认初始容量为 16, 装载因子为 0.75f, 分段 16, 每个段的 HashEntry<K,V>[]大小为 2. 键值都不能为 null. 每次扩容为原来容量的 2 倍, ConcurrentHashMap 不会对整个容器进行扩容, 而只对某个 segment 进行扩容. 在获取 size 操作的时候, 不是直接把所有 segment 的 count 相加就可以可到整个 ConcurrentHashMap 大小, 也不是在统计 size 的时候把所有的 segment 的 put, remove, clean 方法全部锁住, 这种方法太低效. 在累加 count 操作过程中, 之前累加过的 count 发生变化的几率非常小, 所有 ConcurrentHashMap 的做法是先尝试 2(RETRIES_BEFORE_LOCK)次通过不锁住 Segment 的方式统计各个 Segment 大小, 如果统计的过程中, 容器的 count 发生了变化, 再采用加锁的方式来统计所有的 Segment 的大小.
28. 线程安全的非阻塞队列
非阻塞队列有 ConcurrentLinkedQueue, ConcurrentLinkedDeque. 元素不能为 null. 以 ConcurrentLinkedQueue 为例, 有头 head 和尾 tail 两个指针, 遵循 FIFO 的原则进行入队和出队, 方法有 add(E e), peek()取出不删除, poll()取出删除, remove(Object o),size(), contains(Object o), addAll(Collection c), isEmpty().ConcurrentLinkedDeque 是双向队列, 可以在头和尾两个方向进行相应的操作.
29. 阻塞队列
阻塞队列 (BlockingQueue) 是一个支持两个附加操作的队列. 这两个附加的操作支持阻塞的插入和移除方法.
支持阻塞的插入方法: 意思是当队列满时, 队列会阻塞插入元素的线程, 直到队列不满.
支持阻塞的移除方法: 意思是队列为空时, 获取元素的线程会等待队列变为非空.
任何阻塞队列中的元素都不能为 null.
30. 阻塞队列的插入和移除操作处理方式:
方法 - 处理方法 抛出异常 返回特殊值 可能阻塞等待 可设定等待时间
入队 add(e) offer(e) put(e) offer(e,timeout,unit)
出队 remove() poll() take() poll(timeout,unit)
查看 element() peek() 无 无
如果是无界队列, 队列不可能出现满的情况, 所以使用 put 或 offer 方法永远不会被阻塞, 而且使用 offer 方法时, 该方法永远返回 true.
31. Java 里的阻塞队列
ArrayBlockingQueue: 一个由数组结构组成的有界阻塞队列.
LinkedeBlockingQueue: 一个有链表结构组成的有界阻塞队列.
PriorityBlockingQueue: 一个支持优先级排序的无界阻塞队列
DelayQueue: 一个使用优先级队列实现的无界阻塞队列.
SynchronousQueue: 一个不存储元素的阻塞队列.
LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列.
LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列.
32. ArrayBlockingQueue
此队列按照 FIFO 的原则对元素进行排序, 可以设定为公平 ArrayBlockingQueue(int capacity, boolean fair), 默认为不公平. 初始化时必须设定容量大小 ArrayBlockingQueue(int capactiy).
33. LinkedBlockingQueue
与 ArrayBlockingQueue 一样, 按照 FIFO 原则进行排序, 与 ArrayBlockingQueue 不同的是内部实现是一个链表结构, 且不能设置为公平的. 默认和最大长度为 Integer.MAX_VALUE.
34. PriorityBlockingQueue
是一个支持优先级的无界阻塞队列, 默认初始容量为 11, 默认情况下采用自然顺序升序排列, 不能保证同优先级元素的顺序. 内部元素要么实现 Comparable 接口, 要么在初始化的时候指定构造函数的 Comparator 来对元素进行排序, 有关 Comparable 与 Comparator 的细节可以参考: Comparable 与 Comparator 浅析.
- http://blog.csdn.net/u013256816/article/details/50899416
- 35. DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列. 内部包含一个 PriorityQueue 来实现, 队列中的元素必须实现 Delay 接口, 在创建元素时可以指定多久才能从队列中获取当前元素. 只有在延迟期满时才能从队列中提取元素.
DelayQueue 非常有用, 可以将 DelayQueue 运用在下面应用场景.
- 缓存系统的设计: 可以用 DelayQueue 保存缓存元素的有效期, 使用一个线程循环查询 DelayQueue, 一旦能从 DelayQueue 中获取元素时, 表示缓存有效期到了.
- 定时任务调度: 使用 DelayQueue 保存当天将会执行的任务和执行时间, 一旦从 DelayQueue 中获取到任务就开始执行, 比如 TimerQueue 就是使用 DelayQueue 实现的.
36. SynchronousQueue
是一个不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作, 否则不能继续添加元素, 非常适合传递性场景. 支持公平访问队列. 默认情况下线程采用非公平策略访问队列.
37. LinkedTransferQueue
是一个由链表结构组成的无界阻塞 TransferQueue 队列. 相对于其他阻塞队列, LinkedTransferQueue 多了 tryTransfer 和 transfer 方法.
transfer 方法: 如果当前有消费者正在等待接收元素 (消费者使用 take() 或者带时间限制的 poll 方法时),transfer 方法可以把生产者传入的元素立刻 transfer 给消费者, 如果没有消费者在等待接收元素, transfer 方法会将元素存放在队列的 tail 节点, 并等到该元素被消费者消费了才返回.
tryTransfer 方法: 用来试探生产者传入的元素是否能直接传给消费者. 如果没有消费者等待接收元素, 则返回 false. 和 transfer 方法的区别是 tryTransfer 方法无论消费者是否接收, 方法立刻返回, 而 transfer 方法是必须等到消费者消费了才返回.
38. LinkedBlockingDeque
LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列. 所谓双向队列是指可以从队列的两端插入和移除元素. 双向队列因为多了一个操作队列的入口, 在多线程同时入队时, 也就减少了一半的竞争. 相对其他的阻塞队列, LinkedBlockingDeque 多了 addFirst, addLast, offerFirst, offerLast, peekFirst, peekLast 等方法,
39. Fork/Join 框架
Fork/Join 框架是 JDK7 提供的一个用于并行执行任务的框架, 是一个把大任务切分为若干子任务并行的执行, 最终汇总每个小任务后得到大任务结果的框架. 我们再通过 Fork 和 Join 来理解下 Fork/Join 框架. Fork 就是把一个大任务划分成为若干子任务并行的执行, Join 就是合并这些子任务的执行结果, 最后得到这个大任务的结果.
使用 Fork/Join 框架时, 首先需要创建一个 ForkJoin 任务, 它提供在任务中执行 fork()和 join 操作的机制. 通常情况下, 我们不需要直接继承 ForkJoinTask, 只需要继承它的子类, Fork/Join 框架提供了两个子类: RecursiveAction 用于没有返回结果的任务; RecursiveTask 用于有返回结果的任务. ForkJoinTask 需要通过 ForkJoinPool 来执行.
任务分割出的子任务会添加到当前工作线程所维护的双端队列中, 进入队列的头部. 当一个工作线程的队列里暂时没有任务时, 它会随机从其他工作线程的队列的尾部获取一个任务.(工作窃取算法 work-stealing)
示例: 计算 1+2+3+...+100 的结果.
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.Future;
- import java.util.concurrent.RecursiveTask;
- public class CountTask extends RecursiveTask<Integer>
- {
- private static final int THRESHOLD = 10;
- private int start;
- private int end;
- public CountTask(int start, int end)
- {
- super();
- this.start = start;
- this.end = end;
- }
- @Override
- protected Integer compute()
- {
- int sum = 0;
- boolean canCompute = (end-start) <= THRESHOLD;
- if(canCompute)
- {
- for(int i=start;i<=end;i++)
- {
- sum += i;
- }
- }
- else
- {
- int middle = (start+end)/2;
- CountTask leftTask = new CountTask(start,middle);
- CountTask rightTask = new CountTask(middle+1,end);
- leftTask.fork();
- rightTask.fork();
- int leftResult = leftTask.join();
- int rightResult = rightTask.join();
- sum = leftResult+rightResult;
- }
- return sum;
- }
- public static void main(String[] args)
- {
- ForkJoinPool forkJoinPool = new ForkJoinPool();
- CountTask task = new CountTask(1,100);
- Future<Integer> result = forkJoinPool.submit(task);
- try
- {
- System.out.println(result.get());
- }
- catch (InterruptedException | ExecutionException e)
- {
- e.printStackTrace();
- }
- if(task.isCompletedAbnormally()){
- System.out.println(task.getException());
- }
- }
- }
40. 原子类
Java 中 Atomic 包里一共提供了 12 个类, 属于 4 种类型的原子更新方式, 分别是原子更新基本类型, 原子更新数组, 原子更新引用, 原子更新属性(字段).Atomic 包里的类基本都是使用 Unsafe 实现的包装类.
1)原子更新基本类型: AtomicBoolean,AtomicInteger, AtomicLong.
2)原子更新数组: AtomicIntegerArray,AtomicLongArray, AtomicReferenceArray.
3)原子更新引用类型: AtomicReference, AtomicStampedReference, AtomicMarkableReference.
4 ) 原子更新字段类型: AtomicReferenceFieldUpdater, AtomicIntegerFieldUpdater, AtomicLongFieldUpdater.
41. 原子更新基本类型
AtomicBoolean,AtomicInteger, AtomicLong 三个类提供的方法类似, 以 AtomicInteger 为例: 有 int addAndGet(int delta), boolean compareAndSet(int expect, int update), int getAndIncrement(), void lazySet(int newValue),int getAndSet(int newValue). 其中大多数的方法都是调用 compareAndSet 方法实现的, 譬如 getAndIncrement():
- public final int getAndIncrement() {
- for (;;) {
- int current = get();
- int next = current + 1;
- if (compareAndSet(current, next))
- return current;
- }
- }
- public final boolean compareAndSet(int expect, int update) {
- return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
- }
sun.misc.Unsafe 只提供三种 CAS 方法: compareAndSwapObject, compareAndSwapInt 和 compareAndSwapLong, 再看 AtomicBoolean 源码, 发现它是先把 Boolean 转换成整形, 再使用 compareAndSwapInt 进行 CAS, 原子更新 char,float,double 变量也可以用类似的思路来实现.
42. 原子更新数组
以 AtomicIntegerArray 为例, 此类主要提供原子的方式更新数组里的整形, 常用方法如下:
int addAndGet(int i, int delta): 以原子的方式将输入值与数组中索引 i 的元素相加.
boolean compareAndSet(int i, int expect, int update): 如果当前值等于预期值, 则以原子方式将数组位置 i 的元素设置成 update 值.
AtomicIntegerArray 的两个构造方法:
AtomicIntegerArray(int length): 指定数组的大小, 并初始化为 0
AtomicIntegerArray(int [] array): 对给定的数组进行拷贝.
案例:
- int value[] = new int[]{
- 1,2,3
- };
- AtomicIntegerArray aia = new AtomicIntegerArray(value);
- System.out.println(aia.getAndSet(1, 9));
- System.out.println(aia.get(1));
- System.out.println(value[1]);
运行结果: 2 9 2
43. CountDownLatch
CountDownLatch 允许一个或多个线程等待其他线程完成操作. CountDownLatch 的构造函数接收一个 int 类型的参数作为计数器, 如果你想等待 N 个点完成, 这里就传入 N(CountDownLatch(int count)).
CountDownLatch 的方法有: await(), await(long timeout, TimeUnit unit), countDown(), getCount()等.
计数器必须大于等于 0, 只是等于 0 的时候, 计数器就是零, 调用 await 方法时不会阻塞当前线程. CountDownLatch 不可能重新初始化或者修改 CountDownLatch 对象的内部计数器的值. 一个线程调用 countDown 方法 happens-before 另一个线程调用的 await()方法.
44. CyclicBarrier
让一组线程达到一个屏障时被阻塞, 知道最后一个线程到达屏障时, 屏障才会开门, 所有被屏障拦截的线程才会继续运行. CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties), 其参数表示屏障拦截的线程数量, 每个线程调用 await 方法告诉 CyclicBarrier 我已经达到了屏障, 然后当前线程被阻塞. CyclicBarrier 还提供了一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction)用于在线程达到屏障时, 优先执行 barrierAction, 方便处理更复杂的业务场景, 举例如下.
- import java.util.concurrent.BrokenBarrierException;
- import java.util.concurrent.CyclicBarrier;
- public class CyclicBarrierTest
- {
- static CyclicBarrier c = new CyclicBarrier(2,new A());
- public static void main(String[] args)
- {
- new Thread(new Runnable(){
- @Override
- public void run()
- {
- try
- {
- System.out.println(1);
- c.await();
- }
- catch (InterruptedException | BrokenBarrierException e)
- {
- e.printStackTrace();
- }
- System.out.println(2);
- }
- }).start();
- try
- {
- System.out.println(3);
- c.await();
- }
- catch (InterruptedException | BrokenBarrierException e)
- {
- e.printStackTrace();
- }
- System.out.println(4);
- }
- static class A implements Runnable
- {
- @Override
- public void run()
- {
- System.out.println(5);
- }
- }
- }
输出结果: 3 1 5 2 4
45. CyclicBarrier 和 CountDownLatch 的区别
CountDownLatch 的计数器只能使用一次, 而 CyclicBarrier 的计数器可以使用 reset()方法重置.
46. Semaphore
Semaphore(信号量)是用来控制同事访问特定资源的线程数量, 它协调各个线程, 以保证合理的使用公共资源. Semaphore 有两个构造函数: Semaphore(int permits)默认是非公平的, Semaphore(int permits, boolean fair)可以设置为公平的. 应用案例如下:
- public class SemaphoreTest
- {
- private static final int THREAD_COUNT=30;
- private static ExecutorService threadPool = Executors.newFixedThreadPool(30);
- private static Semaphore s = new Semaphore(10);
- public static void main(String[] args)
- {
- for(int i=0;i<THREAD_COUNT;i++)
- {
- final int a = i;
- threadPool.execute(new Runnable(){
- @Override
- public void run()
- {
- try
- {
- s.acquire();
- System.out.println("do something...."+a);
- s.release();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- });
- }
- threadPool.shutdown();
- }
- }
由上例可以看出 Semaphore 的用法非常的简单, 首先线程使用 Semaphore 的 acquire()方法获取一个许可证, 使用完之后调用 release()方法归还许可证. 还可以用 tryAcquire()方法尝试获取许可证. Semaphore 还提供了一些其他方法: int availablePermits()返回此信号量中当前可用的许可证数; int getQueueLength()返回正在等待获取许可证的线程数; boolean hasQueuedThreads()是否有线程正在等待获取许可证; void reducePermits(int reduction)减少 reduction 个许可证, 是个 protected 方法; Collection<Thread> getQueuedThreads()返回所有等待获取许可证的线程集合, 也是一个 protected 方法.
47. 线程间交换数据的 Exchanger
Exchanger 是一个用于线程间协作的工具类. Exchanger 用于进行线程间的数据交换. 它提供一个同步点, 在这个同步点, 两个线程可以交换彼此的数据. 这两个线程通过 exchange 方法交换数据, 如果第一个线程先执行 exchange()方法, 它会一直等待第二个线程也执行 exchange 方法. 当两个线程都到达同步点时, 这两个线程就可以交换数据, 将本现场生产出来的数据传递给对方.
- import java.util.concurrent.Exchanger;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- public class ExchangerTest
- {
- private static final Exchanger<String> exchanger = new Exchanger<>();
- private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
- public static void main(String[] args)
- {
- threadPool.execute(new Runnable(){
- @Override
- public void run()
- {
- String A = "I'm A!";
- try
- {
- String B = exchanger.exchange(A);
- System.out.println("In 1-"+Thread.currentThread().getName()+":"+B);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- });
- threadPool.execute(new Runnable(){
- @Override
- public void run()
- {
- try
- {
- String B="I'm B!";
- String A = exchanger.exchange(B);
- System.out.println("In 2-"+Thread.currentThread().getName()+":"+A);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- });
- threadPool.shutdown();
- }
- }
输出结果:
- In 2-pool-1-thread-2: I'm A!
- In 1-pool-1-thread-1: I'm B!
如果两个线程有一个没有执行 exchange(V x)方法, 则会一直等待, 如果担心有特殊情况发生, 避免一直等待, 可以使用 exchange(V x, long timeout, TimeUnit unit)设置最大等待时长.
48. Java 中的线程池 ThreadPoolExecutor
可以通过 ThreadPoolExecutor 来创建一个线程池:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
1. corePoolSize(线程池基本大小): 当向线程池提交一个任务时, 若线程池已创建的线程数小于 corePoolSize, 即便此时存在空闲线程, 也会通过创建一个新线程来执行该任务, 直到已创建的线程数大于或等于 corePoolSize 时, 才会根据是否存在空闲线程, 来决定是否需要创建新的线程. 除了利用提交新任务来创建和启动线程(按需构造), 也可以通过 prestartCoreThread() 或 prestartAllCoreThreads() 方法来提前启动线程池中的基本线程.
2. maximumPoolSize(线程池最大大小): 线程池所允许的最大线程个数. 当队列满了, 且已创建的线程数小于 maximumPoolSize, 则线程池会创建新的线程来执行任务. 另外, 对于无界队列, 可忽略该参数.
3. keepAliveTime(线程存活保持时间): 默认情况下, 当线程池的线程个数多于 corePoolSize 时, 线程的空闲时间超过 keepAliveTime 则会终止. 但只要 keepAliveTime 大于 0,allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程. 另外, 也可以使用 setKeepAliveTime()动态地更改参数.
4. unit(存活时间的单位): 时间单位, 分为 7 类, 从细到粗顺序: NANOSECONDS(纳秒),MICROSECONDS(微妙),MILLISECONDS(毫秒),SECONDS(秒),MINUTES(分),HOURS(小时),DAYS(天);
5. workQueue(任务队列): 用于传输和保存等待执行任务的阻塞队列. 可以使用此队列与线程池进行交互:
如果运行的线程数少于 corePoolSize, 则 Executor 始终首选添加新的线程, 而不进行排队.
如果运行的线程数等于或多于 corePoolSize, 则 Executor 始终首选将请求加入队列, 而不添加新的线程.
如果无法将请求加入队列, 则创建新的线程, 除非创建此线程超出 maximumPoolSize, 在这种情况下, 任务将被拒绝.
6. threadFactory(线程工厂): 用于创建新线程. 由同一个 threadFactory 创建的线程, 属于同一个 ThreadGroup, 创建的线程优先级都为 Thread.NORM_PRIORITY, 以及是非守护进程状态. threadFactory 创建的线程也是采用 new Thread()方式, threadFactory 创建的线程名都具有统一的风格: pool-m-thread-n(m 为线程池的编号, n 为线程池内的线程编号);
7. handler(线程饱和策略): 当线程池和队列都满了, 则表明该线程池已达饱和状态.
ThreadPoolExecutor.AbortPolicy: 处理程序遭到拒绝, 则直接抛出运行时异常 RejectedExecutionException.(默认策略)
ThreadPoolExecutor.CallerRunsPolicy: 调用者所在线程来运行该任务, 此策略提供简单的反馈控制机制, 能够减缓新任务的提交速度.
ThreadPoolExecutor.DiscardPolicy: 无法执行的任务将被删除.
ThreadPoolExecutor.DiscardOldestPolicy: 如果执行程序尚未关闭, 则位于工作队列头部的任务将被删除, 然后重新尝试执行任务(如果再次失败, 则重复此过程).
可以使用两个方法向线程池提交任务, 分别为 execute()和 submit()方法. execute()方法用于提交不需要返回值的任务, 所以无法判断任务是否被线程池执行成功. submit()方法用于提交需要返回值的任务, 线程池会返回一个 Future 类型的对象, 通过这个对象可以判断任务是否执行成功. 如 Future<Object> future = executor.submit(task);
利用线程池提供的参数进行监控, 参数如下:
getTaskCount(): 线程池需要执行的任务数量.
getCompletedTaskCount(): 线程池在运行过程中已完成的任务数量, 小于或等于 taskCount.
getLargestPoolSize(): 线程池曾经创建过的最大线程数量, 通过这个数据可以知道线程池是否满过. 如等于线程池的最大大小, 则表示线程池曾经满了.
getPoolSize(): 线程池的线程数量. 如果线程池不销毁的话, 池里的线程不会自动销毁, 所以这个大小只增不减.
getActiveCount(): 获取活动的线程数.
49. shutdown 和 shutdownNow
可以调用线程池的 shutdown 或者 shutdownNow 方法来关闭线程池. 他们的原理是遍历线程池的工作线程, 然后逐个调用线程的 interrupt 方法来中断线程, 所以无法响应中断的任务可能永远无法停止.
区别: shutdown 方法将执行平缓的关闭过程: 不在接收新的任务, 同时等待已提交的任务执行完成 -- 包括哪些还未开始执行的任务. shutdownNow 方法将执行粗暴的关闭过程: 它将尝试取消所有运行中的任务, 并且不再启动队列中尚未开始执行的任务.
只要调用了这两个关闭方法中的任意一个, isShutdown 方法就会返回 true, 当所有的任务都已关闭后, 才表示线程池关闭成功, 这时调用 isTerminated 方法会返回 true. 至于应该调用哪一种方法来关闭线程池, 应该由提交到线程池的任务特性决定, 通常调用 shutdown 方法来关闭线程池, 如果任务不一定要执行完, 则可以调用 shutdownNow 方法.
50. 扩展 ThreadPoolExecutor
可以通过继承线程池来自定义线程池, 重写线程池的 beforeExecute, afterExecute 和 terminated 方法. 在执行任务的线程中将调用 beforeExecute 和 afterExecute 等方法, 在这些方法中还可以添加日志, 计时, 监视或者统计信息收集的功能. 无论任务是从 run 中正常返回, 还是抛出一个异常而返回, afterExecute 都会被调用. 如果任务在完成后带有一个 Error, 那么就不会调用 afterExecute. 如果 beforeExecute 抛出一个 RuntimeException, 那么任务将不被执行, 并且 afterExecute 也不会被调用. 在线程池完成关闭时调用 terminated, 也就是在所有任务都已经完成并且所有工作者线程也已经关闭后, terminated 可以用来释放 Executor 在其生命周期里分配的各种资源, 此外还可以执行发送通知, 记录日志或者手机 finalize 统计等操作. 详细可以参考《 JAVA 多线程之扩展 ThreadPoolExecutor》
http://blog.csdn.net/u013256816/article/details/50403962
来源: https://juejin.im/entry/5bd45d966fb9a05d2272e960