目录
简介
CountDownLatch
示例
实现分析
CountDownLatch 与 Thread.join()
CyclicBarrier
实现分析
CountDownLatch 和 CyclicBarrier 区别
简介
在编写多线程程序时, 难免需要对并发流程进行控制, Thread 类有 join()和 yield()等方法, JUC 提供了更为灵活的并发工具类, 下面就学习这些工具类的用法以及实现.
CountDownLatch
latch 意思是门闩, countdown 指从上往下数, CountDownLatch 允许一个或多个线程等待其他任务线程完成操作, 就像它的字面意思: 从大往小数, 数到某个值 (0) 的时候打开门闩. 下面是 CountDownLatch 的 API:
- // 构造器
- public CountDownLatch(int count);
- // 调用 await()方法的线程会进入等待状态, 它会等待直到 count 值为 0 才继续执行
- public void await();
- // 和 await()类似, 只不过等待一定的时间后 count 值还没变为 0 的话就会继续执行
- public boolean await(long timeout, TimeUnit unit);
- // 计数器减一
- public void countDown()
可以看到通过构造器构造一个计数器, 通过调用 countDown 方法计数减小, await 在计数器大于 0 时线程处于等待状态, 通过下面例子可以学会 CountDownLatch 的用法:
示例
- public class LatchTest {
- public static void main(String[] args) {
- // 两个线程, 计数器传入 2
- final CountDownLatch latch = new CountDownLatch(2);
- // 这两个线程执行了 latch.countDown(), 计数器归 0, 主线程才被唤醒继续执行
- new Thread(() -> {
- try {
- System.out.println("子线程 1:"+Thread.currentThread().getName()+"正在执行");
- Thread.sleep(3000);
- System.out.println("子线程 1:"+Thread.currentThread().getName()+"执行完毕");
- latch.countDown();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }).start();
- new Thread(() -> {
- try {
- System.out.println("子线程 2:"+Thread.currentThread().getName()+"正在执行");
- Thread.sleep(3000);
- System.out.println("子线程 2:"+Thread.currentThread().getName()+"执行完毕");
- latch.countDown();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }).start();
- try {
- System.out.println("等待 2 个子线程执行完毕...");
- latch.await();
- System.out.println("2 个子线程已经执行完毕");
- System.out.println("继续执行主线程");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
运行结果:
实现分析
CountDownLatch 是基于共享锁实现的, 内部类 Sync 继承同步器 AQS, 重点分析 CountDownLatch 以下三个方法:
构造方法
通过构造函数传入的参数 count 设置同步状态(count 必须大于 0, 否则抛出异常), 同步状态在这里并不表示线程获得锁的重入次数, 而是表示一个计数器, 计数器的大小与任务线程的数目是一致的,
- public CountDownLatch(int count) {
- if (count <0) throw new IllegalArgumentException("count < 0");
- this.sync = new Sync(count);
- }
- Sync(int count) {
- setState(count);
- }
- await()
调用了 await 的线程会处于等待状态, 直到计数器归 0 才会被唤醒. await 方法调用了 Sync 父类 AQS 的 acquireSharedInterruptibly 方法, acquireSharedInterruptibly 首先检查线程有中断, 然后调用 tryAcquireShared 尝试获取共享锁, 获取成功返回 1, 失败返回 - 1, 若失败调用 doAcquireSharedInterruptibly 将当前线程加入同步队列阻塞住, 等待计数器为 0 唤醒.
- public void await() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- }
- countDown()
countDown 方法将计数器减一, 调用了 AQS 的 releaseShared 方法, 当 tryReleaseShared 方法返回 true 执行 doReleaseShared 方法, 这个方法在分析读写锁是介绍过了, 就是唤醒同步等列等待获取锁的线程, 即唤醒调用了 await 方法等待计数器归 0 的线程.
- public void countDown() {
- sync.releaseShared(1);
- }
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
- tryReleaseShared(int releases)
通过循环 + CAS 的方式修改同步状态 state, 当同步状态为 0 时返回 true; 同步状态为 0, 即表示计数器归 0, 所有调用了 countDown 的线程都执行完了, 可以唤醒调用 await 等待的线程了.
- protected boolean tryReleaseShared(int releases) {
- for (;;) {
- int c = getState();
- if (c == 0)
- return false;
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
CountDownLatch 与 Thread.join()
Thread 类的 join 方法与 CountDownLatch 作用类似, join 方法的实现原理不停检查调用 join 的线程是否存活, 如果存活则让当前线程处于等待状态, 当 join 线程终止后, 会唤醒当前线程. CountDownLatch 与 join 相比更灵活, 不必非得线程中止只要调用了 countDown 方法就行了, 可以响应中断以及能够设置超时等功能.
CyclicBarrier
CyclicBarrier 是指可循环使用的屏障, 它可以让一组线程当他们分别达到了同步点 (common barrier point) 时被阻塞, 直到最后一个线程到达了同步点, 屏障才会开门, 让所有被屏障屏蔽的线程继续运行.
- public class BarrierTest {
- public static void main(String[] args) {
- int size = 4;
- CyclicBarrier barrier = new CyclicBarrier(size);
- for(int i=0;i<size;i++)
- new Writer(barrier).start();
- }
- static class Writer extends Thread{
- private CyclicBarrier cyclicBarrier;
- public Writer(CyclicBarrier cyclicBarrier) {
- this.cyclicBarrier = cyclicBarrier;
- }
- @Override
- public void run() {
- System.out.println("线程"+Thread.currentThread().getName()+"is coming...");
- try {
- // 睡眠模拟业务操作
- Thread.sleep(5000);
- System.out.println("线程"+Thread.currentThread().getName()+"is waiting on barrier");
- cyclicBarrier.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }catch(BrokenBarrierException e){
- e.printStackTrace();
- }
- }
- }
- }
运行结果:
实现分析
类属性及构造方法
- public class CyclicBarrier {
- //CyclicBarrier 使用完了可以重置, 每使用一次都会有一个新的 Generation 对象, broken 表示当前屏障是否被损坏
- private static class Generation {
- boolean broken = false;
- }
- // 重入锁
- private final ReentrantLock lock = new ReentrantLock();
- //condition 实现线程等待与唤醒
- private final Condition trip = lock.newCondition();
- // 表示线程数, 在 parties 个线程都调用 await 方法后, barrier 才算是被通过 (tripped) 了.
- private final int parties;
- // 通过构造方法设置一个 Runnable 对象, 用来在所有线程都到达 barrier 时执行.
- private final Runnable barrierCommand;
- /** The current generation */
- private Generation generation = new Generation();
- //count 表示还剩下未到达 barrier(未调用 await)的线程数量
- private int count;
- // 构造函数
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- this.parties = parties;
- this.count = parties;
- this.barrierCommand = barrierAction;
- }
- public CyclicBarrier(int parties) {
- this(parties, null);
- }
- await()
await 重载的两种方法都是调用的 doWait 方法.
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
- public int await(long timeout, TimeUnit unit)
- throws InterruptedException,
- BrokenBarrierException,
- TimeoutException {
- return dowait(true, unit.toNanos(timeout));
- }
- doWait(boolean timed, long nanos)
doWait 是 await 的核心方法, 通过独占锁和 Condition 对象让线程阻塞等待, 具体先判断当前线程是不是最后一个执行 await 方法的线程, 如果不是, 调用 condition 的 await 方法让线程等待, 在这里我们看到首先线程会获得锁, 进入同步块, 在循环里让线程等待, 这里因为当前线程获得了独占锁, 它处于同步队列的 head 头节点之中, 当调用了 condition.await()方法后, 当前线程从同步队列转移到条件队列, 释放了独占锁, 所以当前线程获取独占锁并不会影响后来的线程获取独占锁, 因为当前线程进入阻塞状态已经释放了独占锁, 直到被唤醒后才会去争取获得独占锁, 到最后会在 finally 块中显示的释放.
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- // 独占锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- //Generation 对象
- final Generation g = generation;
- // 屏障被破坏, 抛出异常
- if (g.broken)
- throw new BrokenBarrierException();
- // 线程被中断
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- int index = --count;
- // 最后一个到达同步点的线程
- if (index == 0) { // tripped
- boolean ranAction = false;
- try {
- final Runnable command = barrierCommand;
- if (command != null)
- command.run();
- ranAction = true;
- nextGeneration();
- return 0;
- } finally {
- if (!ranAction)
- breakBarrier();
- }
- }
- // loop until tripped, broken, interrupted, or timed out
- // 一直循环直到最后一个线程到达同步点, 屏障破损(genneration 的 broken 属性为 true), 中断或超时
- for (;;) {
- try {
- if (!timed)
- trip.await();
- else if (nanos> 0L)
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException IE) {
- //g == generation && !g.broken 说明此时当前这一轮还没结束, 并且没有其它线程执行过
- //breakBarrier 方法. 这种情况会执行 breakBarrier 置 generation 的 broken 标识为 true 并
- // 唤醒其它线程, 之后继续抛出 InterruptedException.
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw IE;
- } else {
- // 如果 g != generation, 此时这一轮已经结束, 后面返回 index 作为到达 barrier 的次序;
- // 如果 g.broken 说明之前已经有其它线程执行了 breakBarrier 方法, 后面会抛出
- //BrokenBarrierException.
- Thread.currentThread().interrupt();
- }
- }
- if (g.broken)
- throw new BrokenBarrierException();
- if (g != generation)
- return index;
- // 超时
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- lock.unlock();
- }
- }
- breakBarrier()
损坏当前屏障, 会唤醒所有在屏障中的线程, 当线程被中断或等待超时会调用
- private void breakBarrier() {
- generation.broken = true;
- count = parties;
- trip.signalAll();
- }
- nextGeneration()
nextGeneration 方法在所有线程进入屏障后会被调用, 即生成下一个版本, 所有线程又可以重新进入到屏障中
- private void nextGeneration() {
- // signal completion of last generation
- trip.signalAll();
- // set up next generation
- count = parties;
- generation = new Generation();
- }
CountDownLatch 和 CyclicBarrier 区别
从功能上说, CountDownLatch 允许一个或多个线程等待其他线程完成操作, 而 CyclicBarrier 是让一组线程达到一个公共同步点之后再一起放行; CountDownLatch 计数器只能使用一次, CyclicBarrier 可以使用 reset 方法重置用以处理某些复杂的业务场景.
来源: https://www.cnblogs.com/rain4j/p/10183118.html